Skip to content

Commit

Permalink
Removing Operator specific handling during a StudyJob run (#387)
Browse files Browse the repository at this point in the history
* Removing Operator specific handling during a StudyJob run

* Return empty in error
  • Loading branch information
johnugeorge authored and k8s-ci-robot committed Feb 20, 2019
1 parent edecd39 commit 8a89b9e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 69 deletions.
94 changes: 55 additions & 39 deletions pkg/controller/studyjob/studyjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
batchv1beta "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -120,7 +122,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
if isFatalWatchError(err, TFJobWorker) {
return err
}

err = c.Watch(
&source.Kind{Type: &pytorchjobv1beta1.PyTorchJob{}},
&handler.EnqueueRequestForOwner{
Expand Down Expand Up @@ -296,15 +297,15 @@ func (r *ReconcileStudyJobController) checkGoal(instance *katibv1alpha1.StudyJob
return goal, nil
}

func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, w *katibv1alpha1.WorkerCondition) error {
wid := w.WorkerID
obj := createWorkerJobObj(w.Kind)
func (r *ReconcileStudyJobController) deleteWorkerResources(instance *katibv1alpha1.StudyJob, ns string, wid string, wkind *schema.GroupVersionKind) error {
nname := types.NamespacedName{Namespace: ns, Name: wid}
var wretain, mcretain bool = false, false
if instance.Spec.WorkerSpec != nil {
wretain = instance.Spec.WorkerSpec.Retain
}
if !wretain {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(*wkind)
joberr := r.Client.Get(context.TODO(), nname, obj)
if joberr == nil {
if err := r.Delete(context.TODO(), obj, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
Expand Down Expand Up @@ -393,46 +394,56 @@ func (r *ReconcileStudyJobController) updateWorker(c katibapi.ManagerClient, ins
return update, nil
}

func (r *ReconcileStudyJobController) getJobWorkerStatus(w *katibv1alpha1.WorkerCondition, ns string) WorkerStatus {
runtimejob := createWorkerJobObj(w.Kind)
nname := types.NamespacedName{Namespace: ns, Name: w.WorkerID}
joberr := r.Client.Get(context.TODO(), nname, runtimejob)
if joberr != nil {
return WorkerStatus{}
}
func (r *ReconcileStudyJobController) getJobWorkerStatus(ns string, wid string, wkind *schema.GroupVersionKind) WorkerStatus {
nname := types.NamespacedName{Namespace: ns, Name: wid}
var state katibapi.State = katibapi.State_RUNNING
var cpTime *metav1.Time
switch w.Kind {
switch wkind.Kind {

case DefaultJobWorker:
job := runtimejob.(*batchv1.Job)
var job batchv1.Job
if err := r.Client.Get(context.TODO(), nname, &job); err != nil {
log.Printf("Client Get error %v for %v", err, nname)
return WorkerStatus{}
}
if job.Status.Active == 0 && job.Status.Succeeded > 0 {
state = katibapi.State_COMPLETED
} else if job.Status.Failed > 0 {
state = katibapi.State_ERROR
}
cpTime = job.Status.CompletionTime
case TFJobWorker:
job := runtimejob.(*tfjobv1beta1.TFJob)
if len(job.Status.Conditions) > 0 {
lc := job.Status.Conditions[len(job.Status.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
}

default:
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(*wkind)
if err := r.Client.Get(context.TODO(), nname, u); err != nil {
log.Printf("Client Get error %v for %v", err, nname)
return WorkerStatus{}
}
cpTime = job.Status.CompletionTime
case PyTorchJobWorker:
job := runtimejob.(*pytorchjobv1beta1.PyTorchJob)
if len(job.Status.Conditions) > 0 {
lc := job.Status.Conditions[len(job.Status.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
status, ok, unerr := unstructured.NestedFieldCopy(u.Object, "status")

if ok {
statusMap := status.(map[string]interface{})
jobStatus := commonv1beta1.JobStatus{}
err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus)
if err != nil {
log.Printf("Error in converting unstructured to status: %v ", err)
return WorkerStatus{}
}
if len(jobStatus.Conditions) > 0 {
lc := jobStatus.Conditions[len(jobStatus.Conditions)-1]
if lc.Type == commonv1beta1.JobSucceeded {
state = katibapi.State_COMPLETED
} else if lc.Type == commonv1beta1.JobFailed {
state = katibapi.State_ERROR
}
}
cpTime = jobStatus.CompletionTime

} else if unerr != nil {
log.Printf("Error in getting Job Status from unstructured: %v", unerr)
return WorkerStatus{}
}
cpTime = job.Status.CompletionTime
}
return WorkerStatus{
CompletionTime: cpTime,
Expand All @@ -459,19 +470,24 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ
}
defer conn.Close()
c := katibapi.NewManagerClient(conn)
wkind, err := getWorkerKind(instance.Spec.WorkerSpec)
if err != nil {
log.Printf("getWorkerKind error %v", err)
return false, err
}
for i, t := range instance.Status.Trials {
for j, w := range t.WorkerList {
if w.Condition == katibv1alpha1.ConditionCompleted || w.Condition == katibv1alpha1.ConditionFailed {
if w.ObjectiveValue == nil && w.Condition == katibv1alpha1.ConditionCompleted {
cwids = append(cwids, w.WorkerID)
}
if err := r.deleteWorkerResources(instance, ns, &w); err != nil {
if err := r.deleteWorkerResources(instance, ns, w.WorkerID, wkind); err != nil {
return false, err
}
continue
}
nextSuggestionSchedule = false
js := r.getJobWorkerStatus(&w, ns)
js := r.getJobWorkerStatus(ns, w.WorkerID, wkind)
update, err = r.updateWorker(c, instance, js, ns, cwids[0:], i, j)
}
}
Expand Down Expand Up @@ -545,7 +561,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
return true, err
}
for _, t := range trials {
wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind, false)
wid, err := r.spawnWorker(instance, c, instance.Status.StudyID, t, instance.Spec.WorkerSpec, wkind.Kind, false)
if err != nil {
log.Printf("Spawn worker error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
Expand All @@ -558,7 +574,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha
WorkerList: []katibv1alpha1.WorkerCondition{
katibv1alpha1.WorkerCondition{
WorkerID: wid,
Kind: wkind,
Kind: wkind.Kind,
Condition: katibv1alpha1.ConditionCreated,
StartTime: metav1.Now(),
},
Expand Down Expand Up @@ -596,12 +612,12 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ
return "", err
}
BUFSIZE := 1024
job := createWorkerJobObj(wkind)
job := &unstructured.Unstructured{}
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil {
log.Printf("Yaml decode error %v", err)
return "", err
}
if err := controllerutil.SetControllerReference(instance, job.(metav1.Object), r.scheme); err != nil {
if err := controllerutil.SetControllerReference(instance, job, r.scheme); err != nil {
log.Printf("SetControllerReference error %v", err)
return "", err
}
Expand All @@ -621,7 +637,7 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp
log.Printf("getWorkerKind error %v", err)
return err
}
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, mcs)
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind.Kind, namespace, mcs)
if err != nil {
log.Printf("getMetricsCollectorManifest error %v", err)
return err
Expand Down
53 changes: 23 additions & 30 deletions pkg/controller/studyjob/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,14 @@ import (

katibapi "github.com/kubeflow/katib/pkg/api"
katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1"
pytorchjobv1beta1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1beta1"
tfjobv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta1"

batchv1 "k8s.io/api/batch/v1"
batchv1beta "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
)

func createWorkerJobObj(kind string) runtime.Object {
switch kind {
case DefaultJobWorker:
return &batchv1.Job{}
case TFJobWorker:
return &tfjobv1beta1.TFJob{}
case PyTorchJobWorker:
return &pytorchjobv1beta1.PyTorchJob{}
}
return nil
}

func validateWorkerResource(wkind string) error {
for _, crd := range invalidCRDResources {
if crd == wkind {
Expand All @@ -62,7 +47,7 @@ func isFatalWatchError(err error, job string) bool {
}
}

func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) {
func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (*schema.GroupVersionKind, error) {
var typeChecker interface{}
BUFSIZE := 1024
_, m, err := getWorkerManifest(
Expand All @@ -78,30 +63,39 @@ func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) {
true,
)
if err != nil {
return "", err
return nil, err
}
if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil {
log.Printf("Yaml decode validation error %v", err)
return "", err
return nil, err
}
tcMap, ok := typeChecker.(map[string]interface{})
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkind, ok := tcMap["kind"]
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkindS, ok := wkind.(string)
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
return nil, fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
apiVersion, ok := tcMap["apiVersion"]
if !ok {
return nil, fmt.Errorf("Cannot get apiVersion of worker %v", typeChecker)
}
apiVersionS, ok := apiVersion.(string)
if !ok {
return nil, fmt.Errorf("Cannot get apiVersion of worker %v", typeChecker)
}
for _, kind := range ValidWorkerKindList {
if kind == wkindS {
return wkindS, validateWorkerResource(kind)
workerGVK := schema.FromAPIVersionAndKind(apiVersionS, kind)
return &workerGVK, validateWorkerResource(kind)
}
}
return "", fmt.Errorf("Invalid kind of worker %v", typeChecker)
return nil, fmt.Errorf("Invalid kind of worker %v", typeChecker)
}

func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error {
Expand All @@ -125,27 +119,26 @@ func validateStudy(instance *katibv1alpha1.StudyJob, namespace string) error {
ParameterSet: []*katibapi.Parameter{},
},
instance.Spec.WorkerSpec,
wkind,
wkind.Kind,
namespace,
true,
)
if err != nil {
return err
}

job := createWorkerJobObj(wkind)
job := &unstructured.Unstructured{}
if err := k8syaml.NewYAMLOrJSONDecoder(wm, BUFSIZE).Decode(job); err != nil {
log.Printf("Yaml decode error %v", err)
return err
}

metav1Job := job.(metav1.Object)
if metav1Job.GetNamespace() != namespace || metav1Job.GetName() != workerID {
if job.GetNamespace() != namespace || job.GetName() != workerID {
return fmt.Errorf("Invalid worker template.")
}

var mcjob batchv1beta.CronJob
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind, namespace, instance.Spec.MetricsCollectorSpec)
mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, wkind.Kind, namespace, instance.Spec.MetricsCollectorSpec)
if err != nil {
log.Printf("getMetricsCollectorManifest error %v", err)
return err
Expand Down

0 comments on commit 8a89b9e

Please sign in to comment.