Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync up worker status all the time #299

Merged
merged 1 commit into from
Dec 20, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 16 additions & 33 deletions pkg/controller/studyjob/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,57 +348,47 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins
if ctime != nil && cjob.Status.LastScheduleTime != nil {
if ctime.Before(cjob.Status.LastScheduleTime) && len(cjob.Status.Active) == 0 {
saveModel(c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid)
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted
instance.Status.Trials[i].WorkerList[j].CompletionTime = metav1.Now()
update = true
_, err := c.UpdateWorkerState(
context.Background(),
&katibapi.UpdateWorkerStateRequest{
WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID,
Status: katibapi.State_COMPLETED,
})
if err != nil {
log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID)
return false, err
}
susp := true
cjob.Spec.Suspend = &susp
if err := r.Update(context.TODO(), cjob); err != nil {
return false, err
}

cwids = append(cwids, wid)
}
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation looks misaligned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it works well (I had verified it). else match with line 347 (not 348)

// for some reason, metricsCollector for this worker cannot be found (deleted by anyone accidentally or even failed to be created)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the job finishes early but metric collector is not spawned yet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, metrics including objectiveMetrics will not be collected, studyJob will have no idea which group of hyperParameter have better performance. That is, studyJob will study nothing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a worker job changes to "Running", metric collector cronjob is spawned rightly. That is, when a worker job changes to "Complete", but its corresponding metric collector cronjob cannot be found. In this case, I think it is deleted by anyone accidentally or even failed to be created.

update = true
instance.Status.Condition = katibv1alpha1.ConditionFailed
}
if update {
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted
instance.Status.Trials[i].WorkerList[j].CompletionTime = metav1.Now()
cwids = append(cwids, wid)
}
case katibapi.State_RUNNING:
if instance.Status.Trials[i].WorkerList[j].Condition != katibv1alpha1.ConditionRunning {
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionRunning
update = true
}
if errors.IsNotFound(cjoberr) {
r.spawnMetricsCollector(instance, c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid, ns, instance.Spec.MetricsCollectorSpec)
}
_, err := c.UpdateWorkerState(
context.Background(),
&katibapi.UpdateWorkerStateRequest{
WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID,
Status: katibapi.State_RUNNING,
})
if err != nil {
log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID)
return false, err
spawnErr := r.spawnMetricsCollector(instance, c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, wid, ns, instance.Spec.MetricsCollectorSpec)
if spawnErr != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
}
}
case katibapi.State_ERROR:
if instance.Status.Trials[i].WorkerList[j].Condition != katibv1alpha1.ConditionFailed {
instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionFailed
update = true
}
}
if update {
_, err := c.UpdateWorkerState(
context.Background(),
&katibapi.UpdateWorkerStateRequest{
WorkerId: instance.Status.Trials[i].WorkerList[j].WorkerID,
Status: katibapi.State_ERROR,
Status: status.WorkerState,
})
if err != nil {
log.Printf("Fail to update worker info. ID %s", instance.Status.Trials[i].WorkerList[j].WorkerID)
Expand Down Expand Up @@ -612,17 +602,14 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ
BUFSIZE := 1024
job := createWorkerJobObj(wkind)
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("Yaml decode error %v", err)
return "", err
}
if err := controllerutil.SetControllerReference(instance, job.(metav1.Object), r.scheme); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("SetControllerReference error %v", err)
return "", err
}
if err := r.Create(context.TODO(), job); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("Job Create error %v", err)
return "", err
}
Expand All @@ -636,7 +623,6 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp
wkind, err := getWorkerKind(instance.Spec.WorkerSpec)
if err != nil {
log.Printf("getWorkerKind error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
return err
}
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs)
Expand All @@ -646,19 +632,16 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp
}

if err := k8syaml.NewYAMLOrJSONDecoder(mcm, BUFSIZE).Decode(&mcjob); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("MetricsCollector Yaml decode error %v", err)
return err
}

if err := controllerutil.SetControllerReference(instance, &mcjob, r.scheme); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("MetricsCollector SetControllerReference error %v", err)
return err
}

if err := r.Create(context.TODO(), &mcjob); err != nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
log.Printf("MetricsCollector Job Create error %v", err)
return err
}
Expand Down