From 7a8c0eebb580dc486f39f006636449f0b454e946 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Mon, 15 Apr 2019 14:40:58 +0530 Subject: [PATCH 1/5] Adding initial v1alpha2 controller --- .../operators/apis/experiment/v1alpha2/doc.go | 1 + .../experiment/v1alpha2/experiment_types.go | 24 +-- .../apis/experiment/v1alpha2/register.go | 37 +++++ .../apis/experiment/v1alpha2/util.go | 122 ++++++++++++++++ .../v1alpha2/zz_generated.deepcopy.go | 93 ++++++++++++ pkg/api/operators/apis/trial/v1alpha2/doc.go | 7 +- .../operators/apis/trial/v1alpha2/register.go | 37 +++++ .../apis/trial/v1alpha2/trial_types.go | 12 +- pkg/api/operators/apis/trial/v1alpha2/util.go | 137 ++++++++++++++++++ .../experiment/experiment_controller.go | 129 ++++++++++++++++- .../v1alpha2/experiment/util/api_util.go | 32 ++++ .../v1alpha2/experiment/util/status_util.go | 135 +++++++++++++++++ .../v1alpha2/trial/trial_controller.go | 130 ++++++++++++++++- .../v1alpha2/trial/util/api_util.go | 37 +++++ .../v1alpha2/trial/util/status_util.go | 93 ++++++++++++ 15 files changed, 1004 insertions(+), 22 deletions(-) create mode 100644 pkg/api/operators/apis/experiment/v1alpha2/register.go create mode 100644 pkg/api/operators/apis/experiment/v1alpha2/util.go create mode 100644 pkg/api/operators/apis/trial/v1alpha2/register.go create mode 100644 pkg/api/operators/apis/trial/v1alpha2/util.go create mode 100644 pkg/controller/v1alpha2/experiment/util/api_util.go create mode 100644 pkg/controller/v1alpha2/experiment/util/status_util.go create mode 100644 pkg/controller/v1alpha2/trial/util/api_util.go create mode 100644 pkg/controller/v1alpha2/trial/util/status_util.go diff --git a/pkg/api/operators/apis/experiment/v1alpha2/doc.go b/pkg/api/operators/apis/experiment/v1alpha2/doc.go index 898eaf7ca19..5534c91a13d 100644 --- a/pkg/api/operators/apis/experiment/v1alpha2/doc.go +++ b/pkg/api/operators/apis/experiment/v1alpha2/doc.go @@ -18,5 +18,6 @@ limitations under the License. // +k8s:deepcopy-gen=package,register // +k8s:conversion-gen=github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2 // +k8s:defaulter-gen=TypeMeta +// +kubebuilder:subresource:status // +groupName=experiment.kubeflow.org package v1alpha2 diff --git a/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go b/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go index ffe20e9341e..6429b8f9ecd 100644 --- a/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go +++ b/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go @@ -35,10 +35,13 @@ type ExperimentSpec struct { TrialTemplate *TrialTemplate `json:"trialTemplate,omitempty"` // How many trials can be processed in parallel. - ParallelTrialCount int `json:"parallelTrialCount,omitempty"` + ParallelTrialCount *int `json:"parallelTrialCount,omitempty"` - // Total number of trials to run. - MaxTrialCount int `json:"maxTrialCount,omitempty"` + // Max completed trials to mark experiment as succeeded + MaxTrialCount *int `json:"maxTrialCount,omitempty"` + + // Max failed trials to mark experiment as failed. + MaxFailedTrialCount *int `json:"maxFailedTrialCount,omitempty"` // Whether to retain historical data in DB after deletion. RetainHistoricalData bool `json:"retainHistoricalData,omitempty"` @@ -75,8 +78,8 @@ type ExperimentStatus struct { // Current optimal trial parameters and observations. CurrentOptimalTrial OptimalTrial `json:"currentOptimalTrial,omitempty"` - // How many trials have successfully completed. - TrialsCompleted int `json:"trialsCompleted,omitempty"` + // How many trials have succeeded. + TrialsSucceeded int `json:"trialsSucceeded,omitempty"` // How many trials have failed. TrialsFailed int `json:"trialsFailed,omitempty"` @@ -86,6 +89,9 @@ type ExperimentStatus struct { // How many trials are currently pending. TrialsPending int `json:"trialsPending,omitempty"` + + // How many trials are currently running. + TrialsRunning int `json:"trialsRunning,omitempty"` } type OptimalTrial struct { @@ -206,6 +212,7 @@ type GoTemplate struct { // Structure of the Experiment custom resource. // +k8s:openapi-gen=true +// +kubebuilder:subresource:status type Experiment struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -242,7 +249,6 @@ type Operation struct { Parameters []ParameterSpec `json:"parameterconfigs,omitempty"` } -// TODO - enable this during API implementation. -//func init() { -// SchemeBuilder.Register(&Experiment{}, &ExperimentList{}) -//} +func init() { + SchemeBuilder.Register(&Experiment{}, &ExperimentList{}) +} diff --git a/pkg/api/operators/apis/experiment/v1alpha2/register.go b/pkg/api/operators/apis/experiment/v1alpha2/register.go new file mode 100644 index 00000000000..818623de0d5 --- /dev/null +++ b/pkg/api/operators/apis/experiment/v1alpha2/register.go @@ -0,0 +1,37 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1alpha2 contains API Schema definitions for the experiment v1alpha2 API group +// +k8s:openapi-gen=true +// +k8s:deepcopy-gen=package,register +// +k8s:conversion-gen=github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2 +// +k8s:defaulter-gen=TypeMeta +// +kubebuilder:subresource:status +// +groupName=experiments.kubeflow.org +package v1alpha2 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/runtime/scheme" +) + +var ( + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: "kubeflow.org", Version: "v1alpha2"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/pkg/api/operators/apis/experiment/v1alpha2/util.go b/pkg/api/operators/apis/experiment/v1alpha2/util.go new file mode 100644 index 00000000000..bc9a1440a09 --- /dev/null +++ b/pkg/api/operators/apis/experiment/v1alpha2/util.go @@ -0,0 +1,122 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha2 + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func getCondition(exp *Experiment, condType ExperimentConditionType) *ExperimentCondition { + for _, condition := range exp.Status.Conditions { + if condition.Type == condType { + return &condition + } + } + return nil +} + +func hasCondition(exp *Experiment, condType ExperimentConditionType) bool { + cond := getCondition(exp, condType) + if cond != nil && cond.Status == v1.ConditionTrue { + return true + } + return false +} + +func (exp *Experiment) removeCondition(condType ExperimentConditionType) { + var newConditions []ExperimentCondition + for _, c := range exp.Status.Conditions { + + if c.Type == condType { + continue + } + + newConditions = append(newConditions, c) + } + exp.Status.Conditions = newConditions +} + +func newCondition(conditionType ExperimentConditionType, status v1.ConditionStatus, reason, message string) ExperimentCondition { + return ExperimentCondition{ + Type: conditionType, + Status: status, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, + } +} + +func (exp *Experiment) IsCreated() bool { + return hasCondition(exp, ExperimentCreated) +} + +func (exp *Experiment) IsSucceeded() bool { + return hasCondition(exp, ExperimentSucceeded) +} + +func (exp *Experiment) IsFailed() bool { + return hasCondition(exp, ExperimentFailed) +} + +func (exp *Experiment) IsCompleted() bool { + return exp.IsSucceeded() || exp.IsFailed() +} + +func (exp *Experiment) setCondition(conditionType ExperimentConditionType, status v1.ConditionStatus, reason, message string) { + + newCond := newCondition(conditionType, status, reason, message) + currentCond := getCondition(exp, conditionType) + // Do nothing if condition doesn't change + if currentCond != nil && currentCond.Status == newCond.Status && currentCond.Reason == newCond.Reason { + return + } + + // Do not update lastTransitionTime if the status of the condition doesn't change. + if currentCond != nil && currentCond.Status == newCond.Status { + newCond.LastTransitionTime = currentCond.LastTransitionTime + } + + exp.removeCondition(conditionType) + exp.Status.Conditions = append(exp.Status.Conditions, newCond) +} + +func (exp *Experiment) MarkExperimentStatusCreated(reason, message string) { + exp.setCondition(ExperimentCreated, v1.ConditionTrue, reason, message) +} + +func (exp *Experiment) MarkExperimentStatusRunning(reason, message string) { + //exp.removeCondition(ExperimentRestarting) + exp.setCondition(ExperimentRunning, v1.ConditionTrue, reason, message) +} + +func (exp *Experiment) MarkExperimentStatusSucceeded(reason, message string) { + currentCond := getCondition(exp, ExperimentRunning) + if currentCond != nil { + exp.setCondition(ExperimentRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message) + } + exp.setCondition(ExperimentSucceeded, v1.ConditionTrue, reason, message) + +} + +func (exp *Experiment) MarkExperimentStatusFailed(reason, message string) { + currentCond := getCondition(exp, ExperimentRunning) + if currentCond != nil { + exp.setCondition(ExperimentRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message) + } + exp.setCondition(ExperimentFailed, v1.ConditionTrue, reason, message) +} diff --git a/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go b/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go index 344aed137ce..7bc246dcc47 100644 --- a/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go @@ -186,6 +186,26 @@ func (in *ExperimentSpec) DeepCopyInto(out *ExperimentSpec) { *out = new(TrialTemplate) (*in).DeepCopyInto(*out) } + if in.ParallelTrialCount != nil { + in, out := &in.ParallelTrialCount, &out.ParallelTrialCount + *out = new(int) + **out = **in + } + if in.MaxTrialCount != nil { + in, out := &in.MaxTrialCount, &out.MaxTrialCount + *out = new(int) + **out = **in + } + if in.MaxFailedTrialCount != nil { + in, out := &in.MaxFailedTrialCount, &out.MaxFailedTrialCount + *out = new(int) + **out = **in + } + if in.NasConfig != nil { + in, out := &in.NasConfig, &out.NasConfig + *out = new(NasConfig) + (*in).DeepCopyInto(*out) + } return } @@ -277,6 +297,56 @@ func (in *GoTemplate) DeepCopy() *GoTemplate { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GraphConfig) DeepCopyInto(out *GraphConfig) { + *out = *in + if in.InputSizes != nil { + in, out := &in.InputSizes, &out.InputSizes + *out = make([]int32, len(*in)) + copy(*out, *in) + } + if in.OutputSizes != nil { + in, out := &in.OutputSizes, &out.OutputSizes + *out = make([]int32, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GraphConfig. +func (in *GraphConfig) DeepCopy() *GraphConfig { + if in == nil { + return nil + } + out := new(GraphConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NasConfig) DeepCopyInto(out *NasConfig) { + *out = *in + in.GraphConfig.DeepCopyInto(&out.GraphConfig) + if in.Operations != nil { + in, out := &in.Operations, &out.Operations + *out = make([]Operation, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NasConfig. +func (in *NasConfig) DeepCopy() *NasConfig { + if in == nil { + return nil + } + out := new(NasConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ObjectiveSpec) DeepCopyInto(out *ObjectiveSpec) { *out = *in @@ -298,6 +368,29 @@ func (in *ObjectiveSpec) DeepCopy() *ObjectiveSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Operation) DeepCopyInto(out *Operation) { + *out = *in + if in.Parameters != nil { + in, out := &in.Parameters, &out.Parameters + *out = make([]ParameterSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Operation. +func (in *Operation) DeepCopy() *Operation { + if in == nil { + return nil + } + out := new(Operation) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OptimalTrial) DeepCopyInto(out *OptimalTrial) { *out = *in diff --git a/pkg/api/operators/apis/trial/v1alpha2/doc.go b/pkg/api/operators/apis/trial/v1alpha2/doc.go index 898eaf7ca19..e872d1f6505 100644 --- a/pkg/api/operators/apis/trial/v1alpha2/doc.go +++ b/pkg/api/operators/apis/trial/v1alpha2/doc.go @@ -13,10 +13,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package v1alpha2 contains API Schema definitions for the experiment v1alpha2 API group +// Package v1alpha2 contains API Schema definitions for the trial v1alpha2 API group // +k8s:openapi-gen=true // +k8s:deepcopy-gen=package,register -// +k8s:conversion-gen=github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2 +// +k8s:conversion-gen=github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2 // +k8s:defaulter-gen=TypeMeta -// +groupName=experiment.kubeflow.org +// +kubebuilder:subresource:status +// +groupName=trial.kubeflow.org package v1alpha2 diff --git a/pkg/api/operators/apis/trial/v1alpha2/register.go b/pkg/api/operators/apis/trial/v1alpha2/register.go new file mode 100644 index 00000000000..f6c44535051 --- /dev/null +++ b/pkg/api/operators/apis/trial/v1alpha2/register.go @@ -0,0 +1,37 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1alpha2 contains API Schema definitions for the trial v1alpha2 API group +// +k8s:openapi-gen=true +// +k8s:deepcopy-gen=package,register +// +k8s:conversion-gen=github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2 +// +k8s:defaulter-gen=TypeMeta +// +kubebuilder:subresource:status +// +groupName=trials.kubeflow.org +package v1alpha2 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/runtime/scheme" +) + +var ( + // SchemeGroupVersion is group version used to register these objects + SchemeGroupVersion = schema.GroupVersion{Group: "kubeflow.org", Version: "v1alpha2"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/pkg/api/operators/apis/trial/v1alpha2/trial_types.go b/pkg/api/operators/apis/trial/v1alpha2/trial_types.go index b1bd2ca0d5f..1d07e1efabf 100644 --- a/pkg/api/operators/apis/trial/v1alpha2/trial_types.go +++ b/pkg/api/operators/apis/trial/v1alpha2/trial_types.go @@ -95,9 +95,9 @@ type TrialCondition struct { type TrialConditionType string const ( - TrialPending TrialConditionType = "Pending" + TrialCreated TrialConditionType = "Created" TrialRunning TrialConditionType = "Running" - TrialCompleted TrialConditionType = "Completed" + TrialSucceeded TrialConditionType = "Succeeded" TrialKilled TrialConditionType = "Killed" TrialFailed TrialConditionType = "Failed" ) @@ -107,6 +107,7 @@ const ( // Represents the structure of a Trial resource. // +k8s:openapi-gen=true +// +kubebuilder:subresource:status type Trial struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` @@ -124,7 +125,6 @@ type TrialList struct { Items []Trial `json:"items"` } -// TODO: Enable this later during API implementation. -//func init() { -// SchemeBuilder.Register(&Trial{}, &TrialList{}) -//} +func init() { + SchemeBuilder.Register(&Trial{}, &TrialList{}) +} diff --git a/pkg/api/operators/apis/trial/v1alpha2/util.go b/pkg/api/operators/apis/trial/v1alpha2/util.go new file mode 100644 index 00000000000..2fd19688dc3 --- /dev/null +++ b/pkg/api/operators/apis/trial/v1alpha2/util.go @@ -0,0 +1,137 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha2 + +import ( + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func getCondition(trial *Trial, condType TrialConditionType) *TrialCondition { + for _, condition := range trial.Status.Conditions { + if condition.Type == condType { + return &condition + } + } + return nil +} + +func hasCondition(trial *Trial, condType TrialConditionType) bool { + cond := getCondition(trial, condType) + if cond != nil && cond.Status == v1.ConditionTrue { + return true + } + return false +} + +func (trial *Trial) removeCondition(condType TrialConditionType) { + var newConditions []TrialCondition + for _, c := range trial.Status.Conditions { + + if c.Type == condType { + continue + } + + newConditions = append(newConditions, c) + } + trial.Status.Conditions = newConditions +} + +func newCondition(conditionType TrialConditionType, status v1.ConditionStatus, reason, message string) TrialCondition { + return TrialCondition{ + Type: conditionType, + Status: status, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, + } +} + +func (trial *Trial) IsCreated() bool { + return hasCondition(trial, TrialCreated) +} + +func (trial *Trial) IsRunning() bool { + return hasCondition(trial, TrialRunning) +} + +func (trial *Trial) IsSucceeded() bool { + return hasCondition(trial, TrialSucceeded) +} + +func (trial *Trial) IsFailed() bool { + return hasCondition(trial, TrialFailed) +} + +func (trial *Trial) IsKilled() bool { + return hasCondition(trial, TrialKilled) +} + +func (trial *Trial) IsCompleted() bool { + return trial.IsSucceeded() || trial.IsFailed() || trial.IsKilled() +} + +func (trial *Trial) setCondition(conditionType TrialConditionType, status v1.ConditionStatus, reason, message string) { + + newCond := newCondition(conditionType, status, reason, message) + currentCond := getCondition(trial, conditionType) + // Do nothing if condition doesn't change + if currentCond != nil && currentCond.Status == newCond.Status && currentCond.Reason == newCond.Reason { + return + } + + // Do not update lastTransitionTime if the status of the condition doesn't change. + if currentCond != nil && currentCond.Status == newCond.Status { + newCond.LastTransitionTime = currentCond.LastTransitionTime + } + + trial.removeCondition(conditionType) + trial.Status.Conditions = append(trial.Status.Conditions, newCond) +} + +func (trial *Trial) MarkTrialStatusCreated(reason, message string) { + trial.setCondition(TrialCreated, v1.ConditionTrue, reason, message) +} + +func (trial *Trial) MarkTrialStatusRunning(reason, message string) { + trial.setCondition(TrialRunning, v1.ConditionTrue, reason, message) +} + +func (trial *Trial) MarkTrialStatusSucceeded(reason, message string) { + currentCond := getCondition(trial, TrialRunning) + if currentCond != nil { + trial.setCondition(TrialRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message) + } + trial.setCondition(TrialSucceeded, v1.ConditionTrue, reason, message) + +} + +func (trial *Trial) MarkTrialStatusFailed(reason, message string) { + currentCond := getCondition(trial, TrialRunning) + if currentCond != nil { + trial.setCondition(TrialRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message) + } + trial.setCondition(TrialFailed, v1.ConditionTrue, reason, message) +} + +func (trial *Trial) MarkTrialStatusKilled(reason, message string) { + currentCond := getCondition(trial, TrialRunning) + if currentCond != nil { + trial.setCondition(TrialRunning, v1.ConditionFalse, currentCond.Reason, currentCond.Message) + } + trial.setCondition(TrialKilled, v1.ConditionTrue, reason, message) +} diff --git a/pkg/controller/v1alpha2/experiment/experiment_controller.go b/pkg/controller/v1alpha2/experiment/experiment_controller.go index db67dc60105..92ae2d677bb 100644 --- a/pkg/controller/v1alpha2/experiment/experiment_controller.go +++ b/pkg/controller/v1alpha2/experiment/experiment_controller.go @@ -19,8 +19,9 @@ package experiment import ( "context" - experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -29,6 +30,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" "sigs.k8s.io/controller-runtime/pkg/source" + + experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" + trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" + "github.com/kubeflow/katib/pkg/controller/v1alpha2/experiment/util" ) var log = logf.Log.WithName("controller") @@ -63,6 +68,18 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { return err } + // Watch for trials for the experiments + err = c.Watch( + &source.Kind{Type: &trialsv1alpha2.Trial{}}, + &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &experimentsv1alpha2.Experiment{}, + }) + + if err != nil { + return err + } + return nil } @@ -81,6 +98,7 @@ type ReconcileExperiment struct { func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Result, error) { // Fetch the Experiment instance instance := &experimentsv1alpha2.Experiment{} + requeue := false err := r.Get(context.TODO(), request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { @@ -91,6 +109,113 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re // Error reading the object - requeue the request. return reconcile.Result{}, err } + original := instance.DeepCopy() + + if instance.IsCompleted() { + + return reconcile.Result{}, nil + + } + if !instance.IsCreated() { + //Experiment not created in DB + err = util.CreateExperimentinDB(instance) + if err != nil { + return reconcile.Result{}, err + } + + if instance.Status.StartTime == nil { + now := metav1.Now() + instance.Status.StartTime = &now + } + msg := "Experiment is created" + instance.MarkExperimentStatusCreated(util.ExperimentCreatedReason, msg) + requeue = true + } else { + // Experiment already created in DB + err := r.ReconcileExperiment(instance) + if err != nil { + return reconcile.Result{}, err + } + } + + if !equality.Semantic.DeepEqual(original.Status, instance.Status) { + //assuming that only status change + err = util.UpdateExperimentStatusinDB(instance) + if err != nil { + return reconcile.Result{}, err + } + err = r.Status().Update(context.TODO(), instance) + if err != nil { + return reconcile.Result{}, err + } + } + + return reconcile.Result{Requeue: requeue}, nil +} + +func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2.Experiment) error { + var err error + trials := &trialsv1alpha2.TrialList{} + labels := map[string]string{"experiment": instance.Name} + lo := &client.ListOptions{} + lo.MatchingLabels(labels).InNamespace(instance.Namespace) + + err = r.List(context.TODO(), lo, trials) + if err != nil { + return err + } + util.UpdateExperimentStatus(instance, trials) + + reconcileRequired := !instance.IsCompleted() + if err != nil { + return err + } + if reconcileRequired { + r.ReconcileTrials(instance) + } + return err +} + +func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Experiment) error { + var err error + parallelCount := 0 + + if instance.Spec.ParallelTrialCount != nil { + parallelCount = *instance.Spec.ParallelTrialCount + } else { + parallelCount = 3 + } + activeCount := instance.Status.TrialsRunning + succeededCount := instance.Status.TrialsSucceeded + + if activeCount > parallelCount { + deleteCount := activeCount - parallelCount + if deleteCount > 0 { + //delete 'deleteCount' number of trails. Sort them? + } + + } else if activeCount < parallelCount { + requiredActiveCount := 0 + if instance.Spec.MaxTrialCount == nil { + requiredActiveCount = parallelCount + } else { + requiredActiveCount = *instance.Spec.MaxTrialCount - succeededCount + if requiredActiveCount > parallelCount { + requiredActiveCount = parallelCount + } + } + + addCount := requiredActiveCount - activeCount + if addCount < 0 { + log.Info("Invalid setting", "requiredActiveCount", requiredActiveCount, "MaxTrialCount", + *instance.Spec.MaxTrialCount, "SucceededCount", succeededCount) + addCount = 0 + } + + //create "addCount" number of trials + + } + + return err - return reconcile.Result{}, nil } diff --git a/pkg/controller/v1alpha2/experiment/util/api_util.go b/pkg/controller/v1alpha2/experiment/util/api_util.go new file mode 100644 index 00000000000..9e074205ea1 --- /dev/null +++ b/pkg/controller/v1alpha2/experiment/util/api_util.go @@ -0,0 +1,32 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + //v1 "k8s.io/api/core/v1" + + experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" +) + +func CreateExperimentinDB(instance *experimentsv1alpha2.Experiment) error { + + return nil +} + +func UpdateExperimentStatusinDB(instance *experimentsv1alpha2.Experiment) error { + + return nil +} diff --git a/pkg/controller/v1alpha2/experiment/util/status_util.go b/pkg/controller/v1alpha2/experiment/util/status_util.go new file mode 100644 index 00000000000..2e2d0bea674 --- /dev/null +++ b/pkg/controller/v1alpha2/experiment/util/status_util.go @@ -0,0 +1,135 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + //v1 "k8s.io/api/core/v1" + + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + + experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" + trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" +) + +var log = logf.Log.WithName("controller") + +const ( + ExperimentCreatedReason = "ExperimentCreated" + ExperimentRunningReason = "ExperimentRunning" + ExperimentSucceededReason = "ExperimentSucceeded" + ExperimentFailedReason = "ExperimentFailed" + ExperimentKilledReason = "ExperimentKilled" +) + +func UpdateExperimentStatus(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) { + + isObjectiveGoalReached := updateTrialsSummary(instance, trials) + + updateExperimentStatusCondition(instance, isObjectiveGoalReached) + +} + +func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) bool { + + var trialsPending, trialsRunning, trialsSucceeded, trialsFailed, trialsKilled int + var bestTrialIndex int + var bestTrialValue float64 + isObjectiveGoalReached := false + objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName + objectiveValueGoal := instance.Spec.Objective.Goal + objectiveType := instance.Spec.Objective.Type + for index, trial := range trials.Items { + if trial.IsKilled() { + trialsKilled++ + } else if trial.IsFailed() { + trialsFailed++ + } else if trial.IsSucceeded() { + trialsSucceeded++ + } else if trial.IsRunning() { + trialsRunning++ + } else { + trialsPending++ + } + + for _, metric := range trial.Status.Observation.Metrics { + if objectiveMetricName == metric.Name { + if objectiveType == experimentsv1alpha2.ObjectiveTypeMinimize { + if bestTrialValue < metric.Value { + bestTrialValue = metric.Value + bestTrialIndex = index + } + if bestTrialValue <= objectiveValueGoal { + isObjectiveGoalReached = true + } + } else if objectiveType == experimentsv1alpha2.ObjectiveTypeMaximize { + if bestTrialValue > metric.Value { + bestTrialValue = metric.Value + bestTrialIndex = index + } + if bestTrialValue >= objectiveValueGoal { + isObjectiveGoalReached = true + } + } + } + } + } + if len(trials.Items) > 0 { + instance.Status.TrialsPending = trialsPending + instance.Status.TrialsRunning = trialsRunning + instance.Status.TrialsSucceeded = trialsSucceeded + instance.Status.TrialsFailed = trialsFailed + instance.Status.TrialsKilled = trialsKilled + + bestTrial := trials.Items[bestTrialIndex] + + instance.Status.CurrentOptimalTrial.ParameterAssignments = []trialsv1alpha2.ParameterAssignment{} + for _, parameterAssigment := range bestTrial.Spec.ParameterAssignments { + instance.Status.CurrentOptimalTrial.ParameterAssignments = append(instance.Status.CurrentOptimalTrial.ParameterAssignments, parameterAssigment) + } + + instance.Status.CurrentOptimalTrial.Observation.Metrics = []trialsv1alpha2.Metric{} + for _, metric := range bestTrial.Status.Observation.Metrics { + instance.Status.CurrentOptimalTrial.Observation.Metrics = append(instance.Status.CurrentOptimalTrial.Observation.Metrics, metric) + } + } + return isObjectiveGoalReached +} + +func updateExperimentStatusCondition(instance *experimentsv1alpha2.Experiment, isObjectiveGoalReached bool) { + + completedTrialsCount := instance.Status.TrialsSucceeded + instance.Status.TrialsFailed + instance.Status.TrialsKilled + failedTrialsCount := instance.Status.TrialsFailed + + if (instance.Spec.MaxTrialCount != nil) && (completedTrialsCount >= *instance.Spec.MaxTrialCount) { + msg := "Experiment has succeeded" + instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) + return + } + + if (instance.Spec.MaxFailedTrialCount != nil) && (failedTrialsCount >= *instance.Spec.MaxFailedTrialCount) { + msg := "Experiment has failed" + instance.MarkExperimentStatusFailed(ExperimentFailedReason, msg) + return + } + + if isObjectiveGoalReached { + msg := "Experiment has succeeded" + instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) + return + } + msg := "Experiment is running" + instance.MarkExperimentStatusRunning(ExperimentRunningReason, msg) +} diff --git a/pkg/controller/v1alpha2/trial/trial_controller.go b/pkg/controller/v1alpha2/trial/trial_controller.go index 537c4836545..3ac8550723a 100644 --- a/pkg/controller/v1alpha2/trial/trial_controller.go +++ b/pkg/controller/v1alpha2/trial/trial_controller.go @@ -17,18 +17,28 @@ limitations under the License. package trial import ( + "bytes" "context" - trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + k8syaml "k8s.io/apimachinery/pkg/util/yaml" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" "sigs.k8s.io/controller-runtime/pkg/source" + + trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" + "github.com/kubeflow/katib/pkg/controller/v1alpha2/trial/util" ) var log = logf.Log.WithName("controller") @@ -81,6 +91,7 @@ type ReconcileTrial struct { func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, error) { // Fetch the Trial instance instance := &trialsv1alpha2.Trial{} + requeue := false err := r.Get(context.TODO(), request.NamespacedName, instance) if err != nil { if errors.IsNotFound(err) { @@ -92,5 +103,120 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, return reconcile.Result{}, err } - return reconcile.Result{}, nil + original := instance.DeepCopy() + + if instance.IsCompleted() { + + return reconcile.Result{}, nil + + } + if !instance.IsCreated() { + //Trial not created in DB + err = util.CreateTrialinDB(instance) + if err != nil { + return reconcile.Result{}, err + } + if instance.Status.StartTime == nil { + now := metav1.Now() + instance.Status.StartTime = &now + } + msg := "Trial is created" + instance.MarkTrialStatusCreated(util.TrialCreatedReason, msg) + requeue = true + + } else { + // Trial already created in DB + err := r.reconcileTrial(instance) + if err != nil { + return reconcile.Result{}, err + } + } + + if !equality.Semantic.DeepEqual(original.Status, instance.Status) { + //assuming that only status change + err = util.UpdateTrialStatusinDB(instance) + if err != nil { + return reconcile.Result{}, err + } + err = r.Status().Update(context.TODO(), instance) + if err != nil { + return reconcile.Result{}, err + } + } + + return reconcile.Result{Requeue: requeue}, nil +} + +func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { + + var err error + desiredJob, err := r.getDesiredJobSpec(instance) + if err != nil { + log.Info("Error in getting Job Spec from instance") + return err + } + + deployedJob, err := r.reconcileJob(instance, desiredJob) + if err != nil { + return err + } + + //Job already exists + //TODO Can desired Spec differ from deployedSpec? + if deployedJob != nil { + if err = util.UpdateTrialStatusCondition(instance, deployedJob); err != nil { + return err + } + if err = util.UpdateTrialStatusObservation(instance, deployedJob); err != nil { + return err + } + } + return nil +} + +func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha2.Trial, desiredJob *unstructured.Unstructured) (*unstructured.Unstructured, error) { + + var err error + apiVersion := desiredJob.GetAPIVersion() + kind := desiredJob.GetKind() + gvk := schema.FromAPIVersionAndKind(apiVersion, kind) + + deployedJob := &unstructured.Unstructured{} + deployedJob.SetGroupVersionKind(gvk) + err = r.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, deployedJob) + if err != nil { + if errors.IsNotFound(err) { + log.Info("Creating Job", "namespace", instance.Namespace, "name", instance.Name, "kind", kind) + err = r.Create(context.TODO(), desiredJob) + if err != nil { + log.Info("Error in creating job: %v ", err) + return nil, err + } + } else { + return nil, err + } + } + + //TODO create Metric colletor + + msg := "Trial is running" + instance.MarkTrialStatusRunning(util.TrialRunningReason, msg) + return deployedJob, nil +} + +func (r *ReconcileTrial) getDesiredJobSpec(instance *trialsv1alpha2.Trial) (*unstructured.Unstructured, error) { + buf := bytes.NewBufferString(instance.Spec.RunSpec) + bufSize := 1024 + + desiredJobSpec := &unstructured.Unstructured{} + if err := k8syaml.NewYAMLOrJSONDecoder(buf, bufSize).Decode(desiredJobSpec); err != nil { + log.Info("Yaml decode error %v", err) + return nil, err + } + if err := controllerutil.SetControllerReference(instance, desiredJobSpec, r.scheme); err != nil { + log.Info("SetControllerReference error %v", err) + return nil, err + } + + return desiredJobSpec, nil } diff --git a/pkg/controller/v1alpha2/trial/util/api_util.go b/pkg/controller/v1alpha2/trial/util/api_util.go new file mode 100644 index 00000000000..52a763894b2 --- /dev/null +++ b/pkg/controller/v1alpha2/trial/util/api_util.go @@ -0,0 +1,37 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + //v1 "k8s.io/api/core/v1" + + trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" +) + +func CreateTrialinDB(instance *trialsv1alpha2.Trial) error { + + return nil +} + +func UpdateTrialStatusinDB(instance *trialsv1alpha2.Trial) error { + + return nil +} + +func GetTrialObservation(instance *trialsv1alpha2.Trial) error { + + return nil +} diff --git a/pkg/controller/v1alpha2/trial/util/status_util.go b/pkg/controller/v1alpha2/trial/util/status_util.go new file mode 100644 index 00000000000..677a875ecfe --- /dev/null +++ b/pkg/controller/v1alpha2/trial/util/status_util.go @@ -0,0 +1,93 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + //v1 "k8s.io/api/core/v1" + + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + + trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" + commonv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta1" +) + +var log = logf.Log.WithName("controller") + +const ( + DefaultJobKind = "Job" + TrialCreatedReason = "TrialCreated" + TrialRunningReason = "TrialRunning" + TrialSucceededReason = "TrialSucceeded" + TrialFailedReason = "TrialFailed" + TrialKilledReason = "TrialKilled" +) + +func UpdateTrialStatusCondition(instance *trialsv1alpha2.Trial, deployedJob *unstructured.Unstructured) error { + + kind := deployedJob.GetKind() + status, ok, unerr := unstructured.NestedFieldCopy(deployedJob.Object, "status") + + if ok { + statusMap := status.(map[string]interface{}) + switch kind { + + case DefaultJobKind: + jobStatus := batchv1.JobStatus{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) + if err != nil { + log.Info("Error in converting unstructured to status: %v ", err) + return err + } + if jobStatus.Active == 0 && jobStatus.Succeeded > 0 { + msg := "Trial has succeeded" + instance.MarkTrialStatusSucceeded(TrialSucceededReason, msg) + } else if jobStatus.Failed > 0 { + msg := "Trial has failed" + instance.MarkTrialStatusFailed(TrialFailedReason, msg) + } + default: + jobStatus := commonv1beta1.JobStatus{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) + + if err != nil { + log.Info("Error in converting unstructured to status: %v ", err) + return err + } + if len(jobStatus.Conditions) > 0 { + lc := jobStatus.Conditions[len(jobStatus.Conditions)-1] + if lc.Type == commonv1beta1.JobSucceeded { + msg := "Trial has succeeded" + instance.MarkTrialStatusSucceeded(TrialSucceededReason, msg) + } else if lc.Type == commonv1beta1.JobFailed { + msg := "Trial has failed" + instance.MarkTrialStatusFailed(TrialFailedReason, msg) + } + } + } + } else if unerr != nil { + return unerr + } + return nil +} + +func UpdateTrialStatusObservation(instance *trialsv1alpha2.Trial, deployedJob *unstructured.Unstructured) error { + + // read GetObservationLog call and update observation field + return nil +} From cde02872ec412f0ef02d3a005c857b57b481318b Mon Sep 17 00:00:00 2001 From: Johnu George Date: Sun, 21 Apr 2019 13:46:49 +0530 Subject: [PATCH 2/5] Adding logs --- cmd/katib-controller/v1alpha2/main.go | 12 +- .../apis/addtoscheme_katib_v1alpha2.go | 26 ++++ .../experiment/v1alpha2/experiment_types.go | 4 +- .../apis/experiment/v1alpha2/register.go | 7 +- .../operators/apis/trial/v1alpha2/register.go | 7 +- .../experiment/experiment_controller.go | 34 +++-- .../v1alpha2/experiment/util/api_util.go | 4 +- .../v1alpha2/experiment/util/status_util.go | 128 +++++++++++------- .../v1alpha2/trial/trial_controller.go | 48 +++++-- .../v1alpha2/trial/util/api_util.go | 4 +- .../v1alpha2/trial/util/status_util.go | 7 +- 11 files changed, 197 insertions(+), 84 deletions(-) create mode 100644 pkg/api/operators/apis/addtoscheme_katib_v1alpha2.go diff --git a/cmd/katib-controller/v1alpha2/main.go b/cmd/katib-controller/v1alpha2/main.go index 2d8fdb154e7..b6295202651 100644 --- a/cmd/katib-controller/v1alpha2/main.go +++ b/cmd/katib-controller/v1alpha2/main.go @@ -15,8 +15,8 @@ limitations under the License. */ /* - StudyJobController is a controller (operator) for StudyJob - StudyJobController create and watch workers and metricscollectors. + Katib-controller is a controller (operator) for Experiments and Trials + Katib-controller create and watch workers and metricscollectors. The workers and metricscollectors are generated from template defined ConfigMap. The workers and metricscollectors are kubernetes object. The default object is a Job and CronJob. */ @@ -30,10 +30,12 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" "sigs.k8s.io/controller-runtime/pkg/runtime/signals" ) func main() { + logf.SetLogger(logf.ZapLogger(false)) // Get a config to talk to the apiserver cfg, err := config.GetConfig() if err != nil { @@ -41,7 +43,7 @@ func main() { log.Fatal(err) } - // Create a new StudyJobController to provide shared dependencies and start components + // Create a new katib controller to provide shared dependencies and start components mgr, err := manager.New(cfg, manager.Options{}) if err != nil { log.Printf("manager.New") @@ -56,7 +58,7 @@ func main() { log.Fatal(err) } - // Setup StudyJobController + // Setup katib controller if err := controller.AddToManager(mgr); err != nil { log.Printf("controller.AddToManager(mgr)") log.Fatal(err) @@ -64,6 +66,6 @@ func main() { log.Printf("Starting the Cmd.") - // Starting the StudyJobController + // Starting the katib controller log.Fatal(mgr.Start(signals.SetupSignalHandler())) } diff --git a/pkg/api/operators/apis/addtoscheme_katib_v1alpha2.go b/pkg/api/operators/apis/addtoscheme_katib_v1alpha2.go new file mode 100644 index 00000000000..6bbd44cb0e8 --- /dev/null +++ b/pkg/api/operators/apis/addtoscheme_katib_v1alpha2.go @@ -0,0 +1,26 @@ +/* + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +import ( + experiments "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" + trials "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" +) + +func init() { + // Register the types with the Scheme so the components can map objects to GroupVersionKinds and back + AddToSchemes = append(AddToSchemes, experiments.SchemeBuilder.AddToScheme, trials.SchemeBuilder.AddToScheme) +} diff --git a/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go b/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go index 6429b8f9ecd..89b7b4d9c53 100644 --- a/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go +++ b/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go @@ -160,7 +160,7 @@ type FeasibleSpace struct { type ObjectiveSpec struct { Type ObjectiveType `json:"type,omitempty"` - Goal float64 `json:"goal,omitempty"` + Goal *float64 `json:"goal,omitempty"` ObjectiveMetricName string `json:"objectiveMetricName,omitempty"` // This can be empty if we only care about the objective metric. // Note: If we adopt a push instead of pull mechanism, this can be omitted completely. @@ -238,7 +238,7 @@ type NasConfig struct { // GraphConfig contains a config of DAG type GraphConfig struct { - NumLayers int32 `json:"numLayers,omitempty"` + NumLayers *int32 `json:"numLayers,omitempty"` InputSizes []int32 `json:"inputSizes,omitempty"` OutputSizes []int32 `json:"outputSizes,omitempty"` } diff --git a/pkg/api/operators/apis/experiment/v1alpha2/register.go b/pkg/api/operators/apis/experiment/v1alpha2/register.go index 818623de0d5..f195fcc3e99 100644 --- a/pkg/api/operators/apis/experiment/v1alpha2/register.go +++ b/pkg/api/operators/apis/experiment/v1alpha2/register.go @@ -27,9 +27,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/runtime/scheme" ) +const ( + Group = "kubeflow.org" + Version = "v1alpha2" +) + var ( // SchemeGroupVersion is group version used to register these objects - SchemeGroupVersion = schema.GroupVersion{Group: "kubeflow.org", Version: "v1alpha2"} + SchemeGroupVersion = schema.GroupVersion{Group: Group, Version: Version} // SchemeBuilder is used to add go types to the GroupVersionKind scheme SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} diff --git a/pkg/api/operators/apis/trial/v1alpha2/register.go b/pkg/api/operators/apis/trial/v1alpha2/register.go index f6c44535051..2ae2627dc09 100644 --- a/pkg/api/operators/apis/trial/v1alpha2/register.go +++ b/pkg/api/operators/apis/trial/v1alpha2/register.go @@ -27,9 +27,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/runtime/scheme" ) +const ( + Group = "kubeflow.org" + Version = "v1alpha2" +) + var ( // SchemeGroupVersion is group version used to register these objects - SchemeGroupVersion = schema.GroupVersion{Group: "kubeflow.org", Version: "v1alpha2"} + SchemeGroupVersion = schema.GroupVersion{Group: Group, Version: Version} // SchemeBuilder is used to add go types to the GroupVersionKind scheme SchemeBuilder = &scheme.Builder{GroupVersion: SchemeGroupVersion} diff --git a/pkg/controller/v1alpha2/experiment/experiment_controller.go b/pkg/controller/v1alpha2/experiment/experiment_controller.go index 92ae2d677bb..c67e6356740 100644 --- a/pkg/controller/v1alpha2/experiment/experiment_controller.go +++ b/pkg/controller/v1alpha2/experiment/experiment_controller.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -36,7 +37,7 @@ import ( "github.com/kubeflow/katib/pkg/controller/v1alpha2/experiment/util" ) -var log = logf.Log.WithName("controller") +var log = logf.Log.WithName("experiment-controller") /** * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller @@ -59,12 +60,14 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Create a new controller c, err := controller.New("experiment-controller", mgr, controller.Options{Reconciler: r}) if err != nil { + log.Error(err, "Failed to create experiment controller") return err } // Watch for changes to Experiment err = c.Watch(&source.Kind{Type: &experimentsv1alpha2.Experiment{}}, &handler.EnqueueRequestForObject{}) if err != nil { + log.Error(err, "Experiment watch failed") return err } @@ -77,9 +80,11 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { }) if err != nil { + log.Error(err, "Trial watch failed") return err } + log.Info("Experiment controller created") return nil } @@ -97,6 +102,7 @@ type ReconcileExperiment struct { // +kubebuilder:rbac:groups=experiments.kubeflow.org,resources=experiments/status,verbs=get;update;patch func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Result, error) { // Fetch the Experiment instance + log = log.WithValues("Experiment", request.NamespacedName) instance := &experimentsv1alpha2.Experiment{} requeue := false err := r.Get(context.TODO(), request.NamespacedName, instance) @@ -107,6 +113,7 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re return reconcile.Result{}, nil } // Error reading the object - requeue the request. + log.Error(err, "Experiment Get error") return reconcile.Result{}, err } original := instance.DeepCopy() @@ -118,8 +125,9 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re } if !instance.IsCreated() { //Experiment not created in DB - err = util.CreateExperimentinDB(instance) + err = util.CreateExperimentInDB(instance) if err != nil { + log.Error(err, "Create experiment in DB error") return reconcile.Result{}, err } @@ -134,18 +142,21 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re // Experiment already created in DB err := r.ReconcileExperiment(instance) if err != nil { + log.Error(err, "Reconcile experiment error") return reconcile.Result{}, err } } if !equality.Semantic.DeepEqual(original.Status, instance.Status) { //assuming that only status change - err = util.UpdateExperimentStatusinDB(instance) + err = util.UpdateExperimentStatusInDB(instance) if err != nil { + log.Error(err, "Update experiment status in DB error") return reconcile.Result{}, err } err = r.Status().Update(context.TODO(), instance) if err != nil { + log.Error(err, "Update experiment instance status error") return reconcile.Result{}, err } } @@ -154,7 +165,9 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re } func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2.Experiment) error { + var err error + log := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) trials := &trialsv1alpha2.TrialList{} labels := map[string]string{"experiment": instance.Name} lo := &client.ListOptions{} @@ -162,14 +175,17 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2. err = r.List(context.TODO(), lo, trials) if err != nil { + log.Error(err, "Trial List error") return err } - util.UpdateExperimentStatus(instance, trials) - - reconcileRequired := !instance.IsCompleted() - if err != nil { - return err + if len(trials.Items) > 0 { + err := util.UpdateExperimentStatus(instance, trials) + if err != nil { + log.Error(err, "Update experiment status error") + return err + } } + reconcileRequired := !instance.IsCompleted() if reconcileRequired { r.ReconcileTrials(instance) } @@ -177,7 +193,9 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2. } func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Experiment) error { + var err error + log := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) parallelCount := 0 if instance.Spec.ParallelTrialCount != nil { diff --git a/pkg/controller/v1alpha2/experiment/util/api_util.go b/pkg/controller/v1alpha2/experiment/util/api_util.go index 9e074205ea1..348e822c3d2 100644 --- a/pkg/controller/v1alpha2/experiment/util/api_util.go +++ b/pkg/controller/v1alpha2/experiment/util/api_util.go @@ -21,12 +21,12 @@ import ( experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" ) -func CreateExperimentinDB(instance *experimentsv1alpha2.Experiment) error { +func CreateExperimentInDB(instance *experimentsv1alpha2.Experiment) error { return nil } -func UpdateExperimentStatusinDB(instance *experimentsv1alpha2.Experiment) error { +func UpdateExperimentStatusInDB(instance *experimentsv1alpha2.Experiment) error { return nil } diff --git a/pkg/controller/v1alpha2/experiment/util/status_util.go b/pkg/controller/v1alpha2/experiment/util/status_util.go index 2e2d0bea674..734c42c49d3 100644 --- a/pkg/controller/v1alpha2/experiment/util/status_util.go +++ b/pkg/controller/v1alpha2/experiment/util/status_util.go @@ -16,15 +16,16 @@ limitations under the License. package util import ( - //v1 "k8s.io/api/core/v1" + "errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" ) -var log = logf.Log.WithName("controller") +var log = logf.Log.WithName("experiment-status-util") const ( ExperimentCreatedReason = "ExperimentCreated" @@ -34,23 +35,39 @@ const ( ExperimentKilledReason = "ExperimentKilled" ) -func UpdateExperimentStatus(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) { +func UpdateExperimentStatus(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) error { - isObjectiveGoalReached := updateTrialsSummary(instance, trials) + isObjectiveGoalReached, err := updateTrialsSummary(instance, trials) + if err != nil { + return err + } updateExperimentStatusCondition(instance, isObjectiveGoalReached) + return nil } -func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) bool { +func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) (bool, error) { var trialsPending, trialsRunning, trialsSucceeded, trialsFailed, trialsKilled int var bestTrialIndex int var bestTrialValue float64 isObjectiveGoalReached := false - objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName - objectiveValueGoal := instance.Spec.Objective.Goal + objectiveValueGoal := *instance.Spec.Objective.Goal objectiveType := instance.Spec.Objective.Type + objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName + + if objectiveMetricValue := getObjectiveMetricValue(trials.Items[0], objectiveMetricName); objectiveMetricValue != nil { + bestTrialValue = *objectiveMetricValue + if bestTrialValue <= objectiveValueGoal { + isObjectiveGoalReached = true + } + } else { + //may be log + err := errors.New(string(metav1.StatusReasonNotFound)) + return isObjectiveGoalReached, err + } + for index, trial := range trials.Items { if trial.IsKilled() { trialsKilled++ @@ -64,48 +81,58 @@ func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trial trialsPending++ } - for _, metric := range trial.Status.Observation.Metrics { - if objectiveMetricName == metric.Name { - if objectiveType == experimentsv1alpha2.ObjectiveTypeMinimize { - if bestTrialValue < metric.Value { - bestTrialValue = metric.Value - bestTrialIndex = index - } - if bestTrialValue <= objectiveValueGoal { - isObjectiveGoalReached = true - } - } else if objectiveType == experimentsv1alpha2.ObjectiveTypeMaximize { - if bestTrialValue > metric.Value { - bestTrialValue = metric.Value - bestTrialIndex = index - } - if bestTrialValue >= objectiveValueGoal { - isObjectiveGoalReached = true - } - } + objectiveMetricValue := getObjectiveMetricValue(trial, objectiveMetricName) + if objectiveMetricValue == nil { + //may be log + err := errors.New(string(metav1.StatusReasonNotFound)) + return isObjectiveGoalReached, err + } + + if objectiveType == experimentsv1alpha2.ObjectiveTypeMinimize { + if *objectiveMetricValue < bestTrialValue { + bestTrialValue = *objectiveMetricValue + bestTrialIndex = index + } + if bestTrialValue <= objectiveValueGoal { + isObjectiveGoalReached = true + } + } else if objectiveType == experimentsv1alpha2.ObjectiveTypeMaximize { + if *objectiveMetricValue > bestTrialValue { + bestTrialValue = *objectiveMetricValue + bestTrialIndex = index + } + if bestTrialValue >= objectiveValueGoal { + isObjectiveGoalReached = true } } } - if len(trials.Items) > 0 { - instance.Status.TrialsPending = trialsPending - instance.Status.TrialsRunning = trialsRunning - instance.Status.TrialsSucceeded = trialsSucceeded - instance.Status.TrialsFailed = trialsFailed - instance.Status.TrialsKilled = trialsKilled - - bestTrial := trials.Items[bestTrialIndex] - - instance.Status.CurrentOptimalTrial.ParameterAssignments = []trialsv1alpha2.ParameterAssignment{} - for _, parameterAssigment := range bestTrial.Spec.ParameterAssignments { - instance.Status.CurrentOptimalTrial.ParameterAssignments = append(instance.Status.CurrentOptimalTrial.ParameterAssignments, parameterAssigment) - } + instance.Status.TrialsPending = trialsPending + instance.Status.TrialsRunning = trialsRunning + instance.Status.TrialsSucceeded = trialsSucceeded + instance.Status.TrialsFailed = trialsFailed + instance.Status.TrialsKilled = trialsKilled + + bestTrial := trials.Items[bestTrialIndex] - instance.Status.CurrentOptimalTrial.Observation.Metrics = []trialsv1alpha2.Metric{} - for _, metric := range bestTrial.Status.Observation.Metrics { - instance.Status.CurrentOptimalTrial.Observation.Metrics = append(instance.Status.CurrentOptimalTrial.Observation.Metrics, metric) + instance.Status.CurrentOptimalTrial.ParameterAssignments = []trialsv1alpha2.ParameterAssignment{} + for _, parameterAssigment := range bestTrial.Spec.ParameterAssignments { + instance.Status.CurrentOptimalTrial.ParameterAssignments = append(instance.Status.CurrentOptimalTrial.ParameterAssignments, parameterAssigment) + } + + instance.Status.CurrentOptimalTrial.Observation.Metrics = []trialsv1alpha2.Metric{} + for _, metric := range bestTrial.Status.Observation.Metrics { + instance.Status.CurrentOptimalTrial.Observation.Metrics = append(instance.Status.CurrentOptimalTrial.Observation.Metrics, metric) + } + return isObjectiveGoalReached, nil +} + +func getObjectiveMetricValue(trial trialsv1alpha2.Trial, objectiveMetricName string) *float64 { + for _, metric := range trial.Status.Observation.Metrics { + if objectiveMetricName == metric.Name { + return &metric.Value } } - return isObjectiveGoalReached + return nil } func updateExperimentStatusCondition(instance *experimentsv1alpha2.Experiment, isObjectiveGoalReached bool) { @@ -113,23 +140,24 @@ func updateExperimentStatusCondition(instance *experimentsv1alpha2.Experiment, i completedTrialsCount := instance.Status.TrialsSucceeded + instance.Status.TrialsFailed + instance.Status.TrialsKilled failedTrialsCount := instance.Status.TrialsFailed + if isObjectiveGoalReached { + msg := "Experiment has succeeded because Objective goal has reached" + instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) + return + } + if (instance.Spec.MaxTrialCount != nil) && (completedTrialsCount >= *instance.Spec.MaxTrialCount) { - msg := "Experiment has succeeded" + msg := "Experiment has succeeded because max trial count has reached" instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) return } if (instance.Spec.MaxFailedTrialCount != nil) && (failedTrialsCount >= *instance.Spec.MaxFailedTrialCount) { - msg := "Experiment has failed" + msg := "Experiment has failed because max failed count has reached" instance.MarkExperimentStatusFailed(ExperimentFailedReason, msg) return } - if isObjectiveGoalReached { - msg := "Experiment has succeeded" - instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) - return - } msg := "Experiment is running" instance.MarkExperimentStatusRunning(ExperimentRunningReason, msg) } diff --git a/pkg/controller/v1alpha2/trial/trial_controller.go b/pkg/controller/v1alpha2/trial/trial_controller.go index 3ac8550723a..aefeeea7536 100644 --- a/pkg/controller/v1alpha2/trial/trial_controller.go +++ b/pkg/controller/v1alpha2/trial/trial_controller.go @@ -20,6 +20,7 @@ import ( "bytes" "context" + batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,7 +42,7 @@ import ( "github.com/kubeflow/katib/pkg/controller/v1alpha2/trial/util" ) -var log = logf.Log.WithName("controller") +var log = logf.Log.WithName("trial-controller") /** * USER ACTION REQUIRED: This is a scaffold file intended for the user to modify with their own Controller @@ -64,15 +65,28 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { // Create a new controller c, err := controller.New("trial-controller", mgr, controller.Options{Reconciler: r}) if err != nil { + log.Error(err, "Create trial controller error") return err } // Watch for changes to Trial err = c.Watch(&source.Kind{Type: &trialsv1alpha2.Trial{}}, &handler.EnqueueRequestForObject{}) if err != nil { + log.Error(err, "Trial watch error") return err } + err = c.Watch(&source.Kind{Type: &batchv1.Job{}}, + &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &trialsv1alpha2.Trial{}, + }) + if err != nil { + log.Error(err, "Job watch error") + return err + } + + log.Info("Trial controller created") return nil } @@ -90,6 +104,7 @@ type ReconcileTrial struct { // +kubebuilder:rbac:groups=trials.kubeflow.org,resources=trials/status,verbs=get;update;patch func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, error) { // Fetch the Trial instance + log := log.WithValues("Trial", request.NamespacedName) instance := &trialsv1alpha2.Trial{} requeue := false err := r.Get(context.TODO(), request.NamespacedName, instance) @@ -100,6 +115,7 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, return reconcile.Result{}, nil } // Error reading the object - requeue the request. + log.Error(err, "Trial Get error") return reconcile.Result{}, err } @@ -112,8 +128,9 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, } if !instance.IsCreated() { //Trial not created in DB - err = util.CreateTrialinDB(instance) + err = util.CreateTrialInDB(instance) if err != nil { + log.Error(err, "Create trial in DB error") return reconcile.Result{}, err } if instance.Status.StartTime == nil { @@ -128,18 +145,21 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, // Trial already created in DB err := r.reconcileTrial(instance) if err != nil { + log.Error(err, "Reconcile trial error") return reconcile.Result{}, err } } if !equality.Semantic.DeepEqual(original.Status, instance.Status) { //assuming that only status change - err = util.UpdateTrialStatusinDB(instance) + err = util.UpdateTrialStatusInDB(instance) if err != nil { + log.Error(err, "Update trial status in DB error") return reconcile.Result{}, err } err = r.Status().Update(context.TODO(), instance) if err != nil { + log.Error(err, "Update trial instance status error") return reconcile.Result{}, err } } @@ -150,14 +170,16 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { var err error + log := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) desiredJob, err := r.getDesiredJobSpec(instance) if err != nil { - log.Info("Error in getting Job Spec from instance") + log.Error(err, "Job Spec Get error") return err } deployedJob, err := r.reconcileJob(instance, desiredJob) if err != nil { + log.Error(err, "Reconcile job error") return err } @@ -165,9 +187,11 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { //TODO Can desired Spec differ from deployedSpec? if deployedJob != nil { if err = util.UpdateTrialStatusCondition(instance, deployedJob); err != nil { + log.Error(err, "Update trial status condition error") return err } if err = util.UpdateTrialStatusObservation(instance, deployedJob); err != nil { + log.Error(err, "Update trial status observation error") return err } } @@ -177,22 +201,24 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha2.Trial, desiredJob *unstructured.Unstructured) (*unstructured.Unstructured, error) { var err error + log := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) apiVersion := desiredJob.GetAPIVersion() kind := desiredJob.GetKind() gvk := schema.FromAPIVersionAndKind(apiVersion, kind) deployedJob := &unstructured.Unstructured{} deployedJob.SetGroupVersionKind(gvk) - err = r.Get(context.TODO(), types.NamespacedName{Name: instance.Name, Namespace: instance.Namespace}, deployedJob) + err = r.Get(context.TODO(), types.NamespacedName{Name: desiredJob.GetName(), Namespace: desiredJob.GetNamespace()}, deployedJob) if err != nil { if errors.IsNotFound(err) { - log.Info("Creating Job", "namespace", instance.Namespace, "name", instance.Name, "kind", kind) + log.Info("Creating Job", "kind", kind) err = r.Create(context.TODO(), desiredJob) if err != nil { - log.Info("Error in creating job: %v ", err) + log.Error(err, "Create job error") return nil, err } } else { + log.Error(err, "Trial Get error") return nil, err } } @@ -205,16 +231,18 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha2.Trial, desiredJob } func (r *ReconcileTrial) getDesiredJobSpec(instance *trialsv1alpha2.Trial) (*unstructured.Unstructured, error) { - buf := bytes.NewBufferString(instance.Spec.RunSpec) + bufSize := 1024 + log := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + buf := bytes.NewBufferString(instance.Spec.RunSpec) desiredJobSpec := &unstructured.Unstructured{} if err := k8syaml.NewYAMLOrJSONDecoder(buf, bufSize).Decode(desiredJobSpec); err != nil { - log.Info("Yaml decode error %v", err) + log.Error(err, "Yaml decode error") return nil, err } if err := controllerutil.SetControllerReference(instance, desiredJobSpec, r.scheme); err != nil { - log.Info("SetControllerReference error %v", err) + log.Error(err, "SetControllerReference error") return nil, err } diff --git a/pkg/controller/v1alpha2/trial/util/api_util.go b/pkg/controller/v1alpha2/trial/util/api_util.go index 52a763894b2..93db52db3ed 100644 --- a/pkg/controller/v1alpha2/trial/util/api_util.go +++ b/pkg/controller/v1alpha2/trial/util/api_util.go @@ -21,12 +21,12 @@ import ( trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" ) -func CreateTrialinDB(instance *trialsv1alpha2.Trial) error { +func CreateTrialInDB(instance *trialsv1alpha2.Trial) error { return nil } -func UpdateTrialStatusinDB(instance *trialsv1alpha2.Trial) error { +func UpdateTrialStatusInDB(instance *trialsv1alpha2.Trial) error { return nil } diff --git a/pkg/controller/v1alpha2/trial/util/status_util.go b/pkg/controller/v1alpha2/trial/util/status_util.go index 677a875ecfe..64688f922aa 100644 --- a/pkg/controller/v1alpha2/trial/util/status_util.go +++ b/pkg/controller/v1alpha2/trial/util/status_util.go @@ -27,7 +27,7 @@ import ( commonv1beta1 "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta1" ) -var log = logf.Log.WithName("controller") +var log = logf.Log.WithName("trial-status-util") const ( DefaultJobKind = "Job" @@ -51,7 +51,7 @@ func UpdateTrialStatusCondition(instance *trialsv1alpha2.Trial, deployedJob *uns jobStatus := batchv1.JobStatus{} err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) if err != nil { - log.Info("Error in converting unstructured to status: %v ", err) + log.Error(err, "Convert unstructured to status error") return err } if jobStatus.Active == 0 && jobStatus.Succeeded > 0 { @@ -66,7 +66,7 @@ func UpdateTrialStatusCondition(instance *trialsv1alpha2.Trial, deployedJob *uns err := runtime.DefaultUnstructuredConverter.FromUnstructured(statusMap, &jobStatus) if err != nil { - log.Info("Error in converting unstructured to status: %v ", err) + log.Error(err, "Convert unstructured to status error") return err } if len(jobStatus.Conditions) > 0 { @@ -81,6 +81,7 @@ func UpdateTrialStatusCondition(instance *trialsv1alpha2.Trial, deployedJob *uns } } } else if unerr != nil { + log.Error(unerr, "NestedFieldCopy unstructured to status error") return unerr } return nil From 8c48f16de42f434f4658d02317c13939489041b7 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Sun, 21 Apr 2019 16:28:40 +0530 Subject: [PATCH 3/5] Adding comments --- .../v1alpha2/experiment/util/status_util.go | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/pkg/controller/v1alpha2/experiment/util/status_util.go b/pkg/controller/v1alpha2/experiment/util/status_util.go index 734c42c49d3..e2923eb5aa2 100644 --- a/pkg/controller/v1alpha2/experiment/util/status_util.go +++ b/pkg/controller/v1alpha2/experiment/util/status_util.go @@ -16,9 +16,6 @@ limitations under the License. package util import ( - "errors" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" @@ -37,37 +34,23 @@ const ( func UpdateExperimentStatus(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) error { - isObjectiveGoalReached, err := updateTrialsSummary(instance, trials) - if err != nil { - return err - } + isObjectiveGoalReached := updateTrialsSummary(instance, trials) updateExperimentStatusCondition(instance, isObjectiveGoalReached) return nil } -func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) (bool, error) { +func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trialsv1alpha2.TrialList) bool { var trialsPending, trialsRunning, trialsSucceeded, trialsFailed, trialsKilled int - var bestTrialIndex int var bestTrialValue float64 + bestTrialIndex := -1 isObjectiveGoalReached := false objectiveValueGoal := *instance.Spec.Objective.Goal objectiveType := instance.Spec.Objective.Type objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName - if objectiveMetricValue := getObjectiveMetricValue(trials.Items[0], objectiveMetricName); objectiveMetricValue != nil { - bestTrialValue = *objectiveMetricValue - if bestTrialValue <= objectiveValueGoal { - isObjectiveGoalReached = true - } - } else { - //may be log - err := errors.New(string(metav1.StatusReasonNotFound)) - return isObjectiveGoalReached, err - } - for index, trial := range trials.Items { if trial.IsKilled() { trialsKilled++ @@ -83,9 +66,14 @@ func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trial objectiveMetricValue := getObjectiveMetricValue(trial, objectiveMetricName) if objectiveMetricValue == nil { - //may be log - err := errors.New(string(metav1.StatusReasonNotFound)) - return isObjectiveGoalReached, err + log.Info("Objective metric name not found", "trial", trial.GetName()) + continue + } + + //intialize vars to objective metric value of the first trial + if bestTrialIndex == -1 { + bestTrialValue = *objectiveMetricValue + bestTrialIndex = index } if objectiveType == experimentsv1alpha2.ObjectiveTypeMinimize { @@ -123,7 +111,7 @@ func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trial for _, metric := range bestTrial.Status.Observation.Metrics { instance.Status.CurrentOptimalTrial.Observation.Metrics = append(instance.Status.CurrentOptimalTrial.Observation.Metrics, metric) } - return isObjectiveGoalReached, nil + return isObjectiveGoalReached } func getObjectiveMetricValue(trial trialsv1alpha2.Trial, objectiveMetricName string) *float64 { From 169b03c3ffd7f77c8dc2d8014f475250539c2636 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Mon, 22 Apr 2019 23:35:37 +0530 Subject: [PATCH 4/5] Adding template functions for experiment --- cmd/katib-controller/v1alpha2/main.go | 3 - .../v1alpha2/zz_generated.deepcopy.go | 10 ++ .../experiment/experiment_controller.go | 155 ++++++++++++++++-- .../v1alpha2/experiment/util/api_util.go | 6 + .../v1alpha2/experiment/util/status_util.go | 19 ++- .../v1alpha2/trial/trial_controller.go | 42 ++--- 6 files changed, 185 insertions(+), 50 deletions(-) diff --git a/cmd/katib-controller/v1alpha2/main.go b/cmd/katib-controller/v1alpha2/main.go index b6295202651..9d2c89f6d96 100644 --- a/cmd/katib-controller/v1alpha2/main.go +++ b/cmd/katib-controller/v1alpha2/main.go @@ -16,9 +16,6 @@ limitations under the License. /* Katib-controller is a controller (operator) for Experiments and Trials - Katib-controller create and watch workers and metricscollectors. - The workers and metricscollectors are generated from template defined ConfigMap. - The workers and metricscollectors are kubernetes object. The default object is a Job and CronJob. */ package main diff --git a/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go b/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go index 7bc246dcc47..e5f0943453b 100644 --- a/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/api/operators/apis/experiment/v1alpha2/zz_generated.deepcopy.go @@ -300,6 +300,11 @@ func (in *GoTemplate) DeepCopy() *GoTemplate { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GraphConfig) DeepCopyInto(out *GraphConfig) { *out = *in + if in.NumLayers != nil { + in, out := &in.NumLayers, &out.NumLayers + *out = new(int32) + **out = **in + } if in.InputSizes != nil { in, out := &in.InputSizes, &out.InputSizes *out = make([]int32, len(*in)) @@ -350,6 +355,11 @@ func (in *NasConfig) DeepCopy() *NasConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ObjectiveSpec) DeepCopyInto(out *ObjectiveSpec) { *out = *in + if in.Goal != nil { + in, out := &in.Goal, &out.Goal + *out = new(float64) + **out = **in + } if in.AdditionalMetricsNames != nil { in, out := &in.AdditionalMetricsNames, &out.AdditionalMetricsNames *out = make([]string, len(*in)) diff --git a/pkg/controller/v1alpha2/experiment/experiment_controller.go b/pkg/controller/v1alpha2/experiment/experiment_controller.go index c67e6356740..3c5813f1f27 100644 --- a/pkg/controller/v1alpha2/experiment/experiment_controller.go +++ b/pkg/controller/v1alpha2/experiment/experiment_controller.go @@ -17,15 +17,20 @@ limitations under the License. package experiment import ( + "bytes" "context" + "os" + "text/template" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -34,6 +39,7 @@ import ( experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" + apiv1alpha2 "github.com/kubeflow/katib/pkg/api/v1alpha2" "github.com/kubeflow/katib/pkg/controller/v1alpha2/experiment/util" ) @@ -102,10 +108,10 @@ type ReconcileExperiment struct { // +kubebuilder:rbac:groups=experiments.kubeflow.org,resources=experiments/status,verbs=get;update;patch func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Result, error) { // Fetch the Experiment instance - log = log.WithValues("Experiment", request.NamespacedName) - instance := &experimentsv1alpha2.Experiment{} + logger := log.WithValues("Experiment", request.NamespacedName) + original := &experimentsv1alpha2.Experiment{} requeue := false - err := r.Get(context.TODO(), request.NamespacedName, instance) + err := r.Get(context.TODO(), request.NamespacedName, original) if err != nil { if errors.IsNotFound(err) { // Object not found, return. Created objects are automatically garbage collected. @@ -113,10 +119,10 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re return reconcile.Result{}, nil } // Error reading the object - requeue the request. - log.Error(err, "Experiment Get error") + logger.Error(err, "Experiment Get error") return reconcile.Result{}, err } - original := instance.DeepCopy() + instance := original.DeepCopy() if instance.IsCompleted() { @@ -127,7 +133,7 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re //Experiment not created in DB err = util.CreateExperimentInDB(instance) if err != nil { - log.Error(err, "Create experiment in DB error") + logger.Error(err, "Create experiment in DB error") return reconcile.Result{}, err } @@ -142,7 +148,7 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re // Experiment already created in DB err := r.ReconcileExperiment(instance) if err != nil { - log.Error(err, "Reconcile experiment error") + logger.Error(err, "Reconcile experiment error") return reconcile.Result{}, err } } @@ -151,12 +157,12 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re //assuming that only status change err = util.UpdateExperimentStatusInDB(instance) if err != nil { - log.Error(err, "Update experiment status in DB error") + logger.Error(err, "Update experiment status in DB error") return reconcile.Result{}, err } err = r.Status().Update(context.TODO(), instance) if err != nil { - log.Error(err, "Update experiment instance status error") + logger.Error(err, "Update experiment instance status error") return reconcile.Result{}, err } } @@ -167,7 +173,7 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2.Experiment) error { var err error - log := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) trials := &trialsv1alpha2.TrialList{} labels := map[string]string{"experiment": instance.Name} lo := &client.ListOptions{} @@ -175,13 +181,13 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2. err = r.List(context.TODO(), lo, trials) if err != nil { - log.Error(err, "Trial List error") + logger.Error(err, "Trial List error") return err } if len(trials.Items) > 0 { err := util.UpdateExperimentStatus(instance, trials) if err != nil { - log.Error(err, "Update experiment status error") + logger.Error(err, "Update experiment status error") return err } } @@ -195,7 +201,7 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2. func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Experiment) error { var err error - log := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) parallelCount := 0 if instance.Spec.ParallelTrialCount != nil { @@ -203,13 +209,15 @@ func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Expe } else { parallelCount = 3 } - activeCount := instance.Status.TrialsRunning - succeededCount := instance.Status.TrialsSucceeded + activeCount := instance.Status.TrialsPending + instance.Status.TrialsRunning + completedCount := instance.Status.TrialsSucceeded + instance.Status.TrialsFailed + instance.Status.TrialsKilled if activeCount > parallelCount { deleteCount := activeCount - parallelCount if deleteCount > 0 { //delete 'deleteCount' number of trails. Sort them? + logger.Info("DeleteTrials", "deleteCount", deleteCount) + err = r.deleteTrials(instance, deleteCount) } } else if activeCount < parallelCount { @@ -217,7 +225,7 @@ func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Expe if instance.Spec.MaxTrialCount == nil { requiredActiveCount = parallelCount } else { - requiredActiveCount = *instance.Spec.MaxTrialCount - succeededCount + requiredActiveCount = *instance.Spec.MaxTrialCount - completedCount if requiredActiveCount > parallelCount { requiredActiveCount = parallelCount } @@ -225,15 +233,126 @@ func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Expe addCount := requiredActiveCount - activeCount if addCount < 0 { - log.Info("Invalid setting", "requiredActiveCount", requiredActiveCount, "MaxTrialCount", - *instance.Spec.MaxTrialCount, "SucceededCount", succeededCount) + logger.Info("Invalid setting", "requiredActiveCount", requiredActiveCount, "MaxTrialCount", + *instance.Spec.MaxTrialCount, "CompletedCount", completedCount) addCount = 0 } //create "addCount" number of trials + logger.Info("CreateTrials", "addCount", addCount) + err = r.createTrials(instance, addCount) } return err } + +type TrialTemplateParams struct { + Experiment string + Trial string + NameSpace string + HyperParameters []*apiv1alpha2.ParameterAssignment +} + +func (r *ReconcileExperiment) createTrials(instance *experimentsv1alpha2.Experiment, addCount int) error { + + logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + trials, err := util.GetSuggestions(instance, addCount) + /*trials := []apiv1alpha2.Trial{ + apiv1alpha2.Trial{Spec: &apiv1alpha2.TrialSpec{}}, apiv1alpha2.Trial{Spec: &apiv1alpha2.TrialSpec{}}, + }*/ + + trialTemplate, err := r.getTrialTemplate(instance) + if err != nil { + logger.Error(err, "Get trial template error") + } + for _, elem := range trials { + + trial := &trialsv1alpha2.Trial{} + trial.Name = string(uuid.NewUUID()) + trial.Namespace = instance.GetNamespace() + trial.Labels = map[string]string{"experiment": instance.GetName()} + + if err := controllerutil.SetControllerReference(instance, trial, r.scheme); err != nil { + logger.Error(err, "Set controller reference error") + } + + trialParams := TrialTemplateParams{ + Experiment: instance.GetName(), + Trial: trial.Name, + NameSpace: trial.Namespace, + } + + var buf bytes.Buffer + if elem.Spec != nil && elem.Spec.ParameterAssignments != nil { + for _, p := range elem.Spec.ParameterAssignments.Assignments { + trialParams.HyperParameters = append(trialParams.HyperParameters, p) + } + } + err = trialTemplate.Execute(&buf, trialParams) + if err != nil { + logger.Error(err, "Template execute error") + } + + trial.Spec.RunSpec = buf.String() + + err := r.Create(context.TODO(), trial) + if err != nil { + logger.Error(err, "Trial create error", "Trial name", trial.Name) + } + + } + return err +} + +func (r *ReconcileExperiment) getTrialTemplate(instance *experimentsv1alpha2.Experiment) (*template.Template, error) { + + var err error + var tpl *template.Template = nil + logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + trialTemplate := instance.Spec.TrialTemplate + if trialTemplate != nil && trialTemplate.GoTemplate.RawTemplate != "" { + tpl, err = template.New("Trial").Parse(trialTemplate.GoTemplate.RawTemplate) + } else { + //default values if user hasn't set + configMapNS := os.Getenv("KATIB_CORE_NAMESPACE") + configMapName := "trial-template" + templatePath := "defaultTrialTemplate.yaml" + if trialTemplate != nil && trialTemplate.GoTemplate.TemplateSpec != nil { + templateSpec := trialTemplate.GoTemplate.TemplateSpec + if templateSpec.ConfigMapName != "" { + configMapName = templateSpec.ConfigMapName + } + if templateSpec.ConfigMapNamespace != "" { + configMapNS = templateSpec.ConfigMapNamespace + } + if templateSpec.TemplatePath != "" { + templatePath = templateSpec.TemplatePath + } + } + configMap, err := r.getConfigMap(configMapName, configMapNS) + if err != nil { + logger.Error(err, "Get config map error", "configMapName", configMapName, "configMapNS", configMapNS) + } + if configMapTemplate, ok := configMap[templatePath]; !ok { + } else { + tpl, err = template.New("Trial").Parse(configMapTemplate) + } + } + if err != nil { + logger.Error(err, "Template parse error") + } + + return tpl, err +} + +func (r *ReconcileExperiment) deleteTrials(instance *experimentsv1alpha2.Experiment, deleteCount int) error { + + return nil +} + +func (r *ReconcileExperiment) getConfigMap(name, namespace string) (map[string]string, error) { + + return nil, nil +} diff --git a/pkg/controller/v1alpha2/experiment/util/api_util.go b/pkg/controller/v1alpha2/experiment/util/api_util.go index 348e822c3d2..f2b9a9b0cdc 100644 --- a/pkg/controller/v1alpha2/experiment/util/api_util.go +++ b/pkg/controller/v1alpha2/experiment/util/api_util.go @@ -19,6 +19,7 @@ import ( //v1 "k8s.io/api/core/v1" experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" + trialapi "github.com/kubeflow/katib/pkg/api/v1alpha2" ) func CreateExperimentInDB(instance *experimentsv1alpha2.Experiment) error { @@ -30,3 +31,8 @@ func UpdateExperimentStatusInDB(instance *experimentsv1alpha2.Experiment) error return nil } + +func GetSuggestions(instance *experimentsv1alpha2.Experiment, addCount int) ([]*trialapi.Trial, error) { + + return nil, nil +} diff --git a/pkg/controller/v1alpha2/experiment/util/status_util.go b/pkg/controller/v1alpha2/experiment/util/status_util.go index e2923eb5aa2..86c984413a9 100644 --- a/pkg/controller/v1alpha2/experiment/util/status_util.go +++ b/pkg/controller/v1alpha2/experiment/util/status_util.go @@ -100,16 +100,19 @@ func updateTrialsSummary(instance *experimentsv1alpha2.Experiment, trials *trial instance.Status.TrialsFailed = trialsFailed instance.Status.TrialsKilled = trialsKilled - bestTrial := trials.Items[bestTrialIndex] + // if best trial is set + if bestTrialIndex != -1 { + bestTrial := trials.Items[bestTrialIndex] - instance.Status.CurrentOptimalTrial.ParameterAssignments = []trialsv1alpha2.ParameterAssignment{} - for _, parameterAssigment := range bestTrial.Spec.ParameterAssignments { - instance.Status.CurrentOptimalTrial.ParameterAssignments = append(instance.Status.CurrentOptimalTrial.ParameterAssignments, parameterAssigment) - } + instance.Status.CurrentOptimalTrial.ParameterAssignments = []trialsv1alpha2.ParameterAssignment{} + for _, parameterAssigment := range bestTrial.Spec.ParameterAssignments { + instance.Status.CurrentOptimalTrial.ParameterAssignments = append(instance.Status.CurrentOptimalTrial.ParameterAssignments, parameterAssigment) + } - instance.Status.CurrentOptimalTrial.Observation.Metrics = []trialsv1alpha2.Metric{} - for _, metric := range bestTrial.Status.Observation.Metrics { - instance.Status.CurrentOptimalTrial.Observation.Metrics = append(instance.Status.CurrentOptimalTrial.Observation.Metrics, metric) + instance.Status.CurrentOptimalTrial.Observation.Metrics = []trialsv1alpha2.Metric{} + for _, metric := range bestTrial.Status.Observation.Metrics { + instance.Status.CurrentOptimalTrial.Observation.Metrics = append(instance.Status.CurrentOptimalTrial.Observation.Metrics, metric) + } } return isObjectiveGoalReached } diff --git a/pkg/controller/v1alpha2/trial/trial_controller.go b/pkg/controller/v1alpha2/trial/trial_controller.go index aefeeea7536..0167698ebff 100644 --- a/pkg/controller/v1alpha2/trial/trial_controller.go +++ b/pkg/controller/v1alpha2/trial/trial_controller.go @@ -104,10 +104,10 @@ type ReconcileTrial struct { // +kubebuilder:rbac:groups=trials.kubeflow.org,resources=trials/status,verbs=get;update;patch func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, error) { // Fetch the Trial instance - log := log.WithValues("Trial", request.NamespacedName) - instance := &trialsv1alpha2.Trial{} + logger := log.WithValues("Trial", request.NamespacedName) + original := &trialsv1alpha2.Trial{} requeue := false - err := r.Get(context.TODO(), request.NamespacedName, instance) + err := r.Get(context.TODO(), request.NamespacedName, original) if err != nil { if errors.IsNotFound(err) { // Object not found, return. Created objects are automatically garbage collected. @@ -115,11 +115,11 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, return reconcile.Result{}, nil } // Error reading the object - requeue the request. - log.Error(err, "Trial Get error") + logger.Error(err, "Trial Get error") return reconcile.Result{}, err } - original := instance.DeepCopy() + instance := original.DeepCopy() if instance.IsCompleted() { @@ -130,7 +130,7 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, //Trial not created in DB err = util.CreateTrialInDB(instance) if err != nil { - log.Error(err, "Create trial in DB error") + logger.Error(err, "Create trial in DB error") return reconcile.Result{}, err } if instance.Status.StartTime == nil { @@ -145,7 +145,7 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, // Trial already created in DB err := r.reconcileTrial(instance) if err != nil { - log.Error(err, "Reconcile trial error") + logger.Error(err, "Reconcile trial error") return reconcile.Result{}, err } } @@ -154,12 +154,12 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, //assuming that only status change err = util.UpdateTrialStatusInDB(instance) if err != nil { - log.Error(err, "Update trial status in DB error") + logger.Error(err, "Update trial status in DB error") return reconcile.Result{}, err } err = r.Status().Update(context.TODO(), instance) if err != nil { - log.Error(err, "Update trial instance status error") + logger.Error(err, "Update trial instance status error") return reconcile.Result{}, err } } @@ -170,16 +170,16 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { var err error - log := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + logger := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) desiredJob, err := r.getDesiredJobSpec(instance) if err != nil { - log.Error(err, "Job Spec Get error") + logger.Error(err, "Job Spec Get error") return err } deployedJob, err := r.reconcileJob(instance, desiredJob) if err != nil { - log.Error(err, "Reconcile job error") + logger.Error(err, "Reconcile job error") return err } @@ -187,11 +187,11 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { //TODO Can desired Spec differ from deployedSpec? if deployedJob != nil { if err = util.UpdateTrialStatusCondition(instance, deployedJob); err != nil { - log.Error(err, "Update trial status condition error") + logger.Error(err, "Update trial status condition error") return err } if err = util.UpdateTrialStatusObservation(instance, deployedJob); err != nil { - log.Error(err, "Update trial status observation error") + logger.Error(err, "Update trial status observation error") return err } } @@ -201,7 +201,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1alpha2.Trial) error { func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha2.Trial, desiredJob *unstructured.Unstructured) (*unstructured.Unstructured, error) { var err error - log := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + logger := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) apiVersion := desiredJob.GetAPIVersion() kind := desiredJob.GetKind() gvk := schema.FromAPIVersionAndKind(apiVersion, kind) @@ -211,14 +211,14 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha2.Trial, desiredJob err = r.Get(context.TODO(), types.NamespacedName{Name: desiredJob.GetName(), Namespace: desiredJob.GetNamespace()}, deployedJob) if err != nil { if errors.IsNotFound(err) { - log.Info("Creating Job", "kind", kind) + logger.Info("Creating Job", "kind", kind) err = r.Create(context.TODO(), desiredJob) if err != nil { - log.Error(err, "Create job error") + logger.Error(err, "Create job error") return nil, err } } else { - log.Error(err, "Trial Get error") + logger.Error(err, "Trial Get error") return nil, err } } @@ -233,16 +233,16 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1alpha2.Trial, desiredJob func (r *ReconcileTrial) getDesiredJobSpec(instance *trialsv1alpha2.Trial) (*unstructured.Unstructured, error) { bufSize := 1024 - log := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + logger := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) buf := bytes.NewBufferString(instance.Spec.RunSpec) desiredJobSpec := &unstructured.Unstructured{} if err := k8syaml.NewYAMLOrJSONDecoder(buf, bufSize).Decode(desiredJobSpec); err != nil { - log.Error(err, "Yaml decode error") + logger.Error(err, "Yaml decode error") return nil, err } if err := controllerutil.SetControllerReference(instance, desiredJobSpec, r.scheme); err != nil { - log.Error(err, "SetControllerReference error") + logger.Error(err, "Set controller reference error") return nil, err } From e40b89542d8316eafcfc35fb5bf301526365e7d7 Mon Sep 17 00:00:00 2001 From: Johnu George Date: Tue, 23 Apr 2019 17:30:29 +0530 Subject: [PATCH 5/5] Adding error checks --- .../apis/experiment/v1alpha2/constants.go | 30 ++++ .../experiment/v1alpha2/experiment_types.go | 1 + .../experiment/experiment_controller.go | 128 ++++-------------- .../v1alpha2/experiment/experiment_util.go | 123 +++++++++++++++++ .../v1alpha2/trial/metriccollector.go | 16 +++ 5 files changed, 193 insertions(+), 105 deletions(-) create mode 100644 pkg/api/operators/apis/experiment/v1alpha2/constants.go create mode 100644 pkg/controller/v1alpha2/experiment/experiment_util.go create mode 100644 pkg/controller/v1alpha2/trial/metriccollector.go diff --git a/pkg/api/operators/apis/experiment/v1alpha2/constants.go b/pkg/api/operators/apis/experiment/v1alpha2/constants.go new file mode 100644 index 00000000000..2a00316a5ce --- /dev/null +++ b/pkg/api/operators/apis/experiment/v1alpha2/constants.go @@ -0,0 +1,30 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package v1alpha2 + +const ( + // Default value of Spec.ParallelTrialCount + DefaultTrialParallelCount = 3 + + // Default value of Spec.ConfigMapName + DefaultTrialConfigMapName = "trial-template" + + // Default env name of katib namespace + DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE" + + // Default value of Spec.TemplatePath + DefaultTrialTemplatePath = "defaultTrialTemplate.yaml" +) diff --git a/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go b/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go index 89b7b4d9c53..dfdc125c4b3 100644 --- a/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go +++ b/pkg/api/operators/apis/experiment/v1alpha2/experiment_types.go @@ -35,6 +35,7 @@ type ExperimentSpec struct { TrialTemplate *TrialTemplate `json:"trialTemplate,omitempty"` // How many trials can be processed in parallel. + // Defaults to 3 ParallelTrialCount *int `json:"parallelTrialCount,omitempty"` // Max completed trials to mark experiment as succeeded diff --git a/pkg/controller/v1alpha2/experiment/experiment_controller.go b/pkg/controller/v1alpha2/experiment/experiment_controller.go index 3c5813f1f27..1ed1f477b2e 100644 --- a/pkg/controller/v1alpha2/experiment/experiment_controller.go +++ b/pkg/controller/v1alpha2/experiment/experiment_controller.go @@ -17,20 +17,15 @@ limitations under the License. package experiment import ( - "bytes" "context" - "os" - "text/template" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/uuid" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -39,7 +34,6 @@ import ( experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" - apiv1alpha2 "github.com/kubeflow/katib/pkg/api/v1alpha2" "github.com/kubeflow/katib/pkg/controller/v1alpha2/experiment/util" ) @@ -172,21 +166,18 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2.Experiment) error { - var err error logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) trials := &trialsv1alpha2.TrialList{} labels := map[string]string{"experiment": instance.Name} lo := &client.ListOptions{} lo.MatchingLabels(labels).InNamespace(instance.Namespace) - err = r.List(context.TODO(), lo, trials) - if err != nil { + if err := r.List(context.TODO(), lo, trials); err != nil { logger.Error(err, "Trial List error") return err } if len(trials.Items) > 0 { - err := util.UpdateExperimentStatus(instance, trials) - if err != nil { + if err := util.UpdateExperimentStatus(instance, trials); err != nil { logger.Error(err, "Update experiment status error") return err } @@ -195,19 +186,18 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha2. if reconcileRequired { r.ReconcileTrials(instance) } - return err + return nil } func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Experiment) error { - var err error logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) parallelCount := 0 if instance.Spec.ParallelTrialCount != nil { parallelCount = *instance.Spec.ParallelTrialCount } else { - parallelCount = 3 + parallelCount = experimentsv1alpha2.DefaultTrialParallelCount } activeCount := instance.Status.TrialsPending + instance.Status.TrialsRunning completedCount := instance.Status.TrialsSucceeded + instance.Status.TrialsFailed + instance.Status.TrialsKilled @@ -217,7 +207,10 @@ func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Expe if deleteCount > 0 { //delete 'deleteCount' number of trails. Sort them? logger.Info("DeleteTrials", "deleteCount", deleteCount) - err = r.deleteTrials(instance, deleteCount) + if err := r.deleteTrials(instance, deleteCount); err != nil { + logger.Error(err, "Delete trials error") + return err + } } } else if activeCount < parallelCount { @@ -240,25 +233,25 @@ func (r *ReconcileExperiment) ReconcileTrials(instance *experimentsv1alpha2.Expe //create "addCount" number of trials logger.Info("CreateTrials", "addCount", addCount) - err = r.createTrials(instance, addCount) + if err := r.createTrials(instance, addCount); err != nil { + logger.Error(err, "Create trials error") + return err + } } - return err - -} + return nil -type TrialTemplateParams struct { - Experiment string - Trial string - NameSpace string - HyperParameters []*apiv1alpha2.ParameterAssignment } func (r *ReconcileExperiment) createTrials(instance *experimentsv1alpha2.Experiment, addCount int) error { logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) trials, err := util.GetSuggestions(instance, addCount) + if err != nil { + logger.Error(err, "Get suggestions error") + return err + } /*trials := []apiv1alpha2.Trial{ apiv1alpha2.Trial{Spec: &apiv1alpha2.TrialSpec{}}, apiv1alpha2.Trial{Spec: &apiv1alpha2.TrialSpec{}}, }*/ @@ -266,93 +259,18 @@ func (r *ReconcileExperiment) createTrials(instance *experimentsv1alpha2.Experim trialTemplate, err := r.getTrialTemplate(instance) if err != nil { logger.Error(err, "Get trial template error") + return err } - for _, elem := range trials { - - trial := &trialsv1alpha2.Trial{} - trial.Name = string(uuid.NewUUID()) - trial.Namespace = instance.GetNamespace() - trial.Labels = map[string]string{"experiment": instance.GetName()} - - if err := controllerutil.SetControllerReference(instance, trial, r.scheme); err != nil { - logger.Error(err, "Set controller reference error") - } - - trialParams := TrialTemplateParams{ - Experiment: instance.GetName(), - Trial: trial.Name, - NameSpace: trial.Namespace, - } - - var buf bytes.Buffer - if elem.Spec != nil && elem.Spec.ParameterAssignments != nil { - for _, p := range elem.Spec.ParameterAssignments.Assignments { - trialParams.HyperParameters = append(trialParams.HyperParameters, p) - } - } - err = trialTemplate.Execute(&buf, trialParams) - if err != nil { - logger.Error(err, "Template execute error") - } - - trial.Spec.RunSpec = buf.String() - - err := r.Create(context.TODO(), trial) - if err != nil { - logger.Error(err, "Trial create error", "Trial name", trial.Name) - } - - } - return err -} - -func (r *ReconcileExperiment) getTrialTemplate(instance *experimentsv1alpha2.Experiment) (*template.Template, error) { - - var err error - var tpl *template.Template = nil - logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) - trialTemplate := instance.Spec.TrialTemplate - if trialTemplate != nil && trialTemplate.GoTemplate.RawTemplate != "" { - tpl, err = template.New("Trial").Parse(trialTemplate.GoTemplate.RawTemplate) - } else { - //default values if user hasn't set - configMapNS := os.Getenv("KATIB_CORE_NAMESPACE") - configMapName := "trial-template" - templatePath := "defaultTrialTemplate.yaml" - if trialTemplate != nil && trialTemplate.GoTemplate.TemplateSpec != nil { - templateSpec := trialTemplate.GoTemplate.TemplateSpec - if templateSpec.ConfigMapName != "" { - configMapName = templateSpec.ConfigMapName - } - if templateSpec.ConfigMapNamespace != "" { - configMapNS = templateSpec.ConfigMapNamespace - } - if templateSpec.TemplatePath != "" { - templatePath = templateSpec.TemplatePath - } - } - configMap, err := r.getConfigMap(configMapName, configMapNS) - if err != nil { - logger.Error(err, "Get config map error", "configMapName", configMapName, "configMapNS", configMapNS) + for _, trial := range trials { + if err = r.createTrialInstance(instance, trial, trialTemplate); err != nil { + logger.Error(err, "Create trial instance error", "trial", trial) + continue } - if configMapTemplate, ok := configMap[templatePath]; !ok { - } else { - tpl, err = template.New("Trial").Parse(configMapTemplate) - } - } - if err != nil { - logger.Error(err, "Template parse error") } - - return tpl, err + return nil } func (r *ReconcileExperiment) deleteTrials(instance *experimentsv1alpha2.Experiment, deleteCount int) error { return nil } - -func (r *ReconcileExperiment) getConfigMap(name, namespace string) (map[string]string, error) { - - return nil, nil -} diff --git a/pkg/controller/v1alpha2/experiment/experiment_util.go b/pkg/controller/v1alpha2/experiment/experiment_util.go new file mode 100644 index 00000000000..cca21cf9030 --- /dev/null +++ b/pkg/controller/v1alpha2/experiment/experiment_util.go @@ -0,0 +1,123 @@ +package experiment + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "text/template" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilrand "k8s.io/apimachinery/pkg/util/rand" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + experimentsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/experiment/v1alpha2" + trialsv1alpha2 "github.com/kubeflow/katib/pkg/api/operators/apis/trial/v1alpha2" + apiv1alpha2 "github.com/kubeflow/katib/pkg/api/v1alpha2" +) + +type TrialTemplateParams struct { + Experiment string + Trial string + NameSpace string + HyperParameters []*apiv1alpha2.ParameterAssignment +} + +func (r *ReconcileExperiment) createTrialInstance(expInstance *experimentsv1alpha2.Experiment, trialInstance *apiv1alpha2.Trial, trialTemplate *template.Template) error { + logger := log.WithValues("Experiment", types.NamespacedName{Name: expInstance.GetName(), Namespace: expInstance.GetNamespace()}) + + trial := &trialsv1alpha2.Trial{} + trial.Name = fmt.Sprintf("%s-%s", expInstance.GetName(), utilrand.String(8)) + trial.Namespace = expInstance.GetNamespace() + trial.Labels = map[string]string{"experiment": expInstance.GetName()} + + if err := controllerutil.SetControllerReference(expInstance, trial, r.scheme); err != nil { + logger.Error(err, "Set controller reference error") + return err + } + + trialParams := TrialTemplateParams{ + Experiment: expInstance.GetName(), + Trial: trial.Name, + NameSpace: trial.Namespace, + } + + var buf bytes.Buffer + if trialInstance.Spec != nil && trialInstance.Spec.ParameterAssignments != nil { + for _, p := range trialInstance.Spec.ParameterAssignments.Assignments { + trialParams.HyperParameters = append(trialParams.HyperParameters, p) + } + } + if err := trialTemplate.Execute(&buf, trialParams); err != nil { + logger.Error(err, "Template execute error") + return err + } + + trial.Spec.RunSpec = buf.String() + + if err := r.Create(context.TODO(), trial); err != nil { + logger.Error(err, "Trial create error", "Trial name", trial.Name) + return err + } + return nil + +} + +func (r *ReconcileExperiment) getTrialTemplate(instance *experimentsv1alpha2.Experiment) (*template.Template, error) { + + var err error + var tpl *template.Template = nil + logger := log.WithValues("Experiment", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) + trialTemplate := instance.Spec.TrialTemplate + if trialTemplate != nil && trialTemplate.GoTemplate.RawTemplate != "" { + tpl, err = template.New("Trial").Parse(trialTemplate.GoTemplate.RawTemplate) + } else { + //default values if user hasn't set + configMapNS := os.Getenv(experimentsv1alpha2.DefaultKatibNamespaceEnvName) + configMapName := experimentsv1alpha2.DefaultTrialConfigMapName + templatePath := experimentsv1alpha2.DefaultTrialTemplatePath + + if trialTemplate != nil && trialTemplate.GoTemplate.TemplateSpec != nil { + templateSpec := trialTemplate.GoTemplate.TemplateSpec + if templateSpec.ConfigMapName != "" { + configMapName = templateSpec.ConfigMapName + } + if templateSpec.ConfigMapNamespace != "" { + configMapNS = templateSpec.ConfigMapNamespace + } + if templateSpec.TemplatePath != "" { + templatePath = templateSpec.TemplatePath + } + } + configMap, err := r.getConfigMap(configMapName, configMapNS) + if err != nil { + logger.Error(err, "Get config map error", "configMapName", configMapName, "configMapNS", configMapNS) + return nil, err + } + if configMapTemplate, ok := configMap[templatePath]; !ok { + err = errors.New(string(metav1.StatusReasonNotFound)) + logger.Error(err, "Config map template not found", "templatePath", templatePath) + return nil, err + } else { + tpl, err = template.New("Trial").Parse(configMapTemplate) + } + } + if err != nil { + logger.Error(err, "Template parse error") + return nil, err + } + + return tpl, nil +} + +func (r *ReconcileExperiment) getConfigMap(name, namespace string) (map[string]string, error) { + + configMap := &apiv1.ConfigMap{} + if err := r.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: namespace}, configMap); err != nil { + return map[string]string{}, err + } + return configMap.Data, nil +} diff --git a/pkg/controller/v1alpha2/trial/metriccollector.go b/pkg/controller/v1alpha2/trial/metriccollector.go new file mode 100644 index 00000000000..7f5b24bbec0 --- /dev/null +++ b/pkg/controller/v1alpha2/trial/metriccollector.go @@ -0,0 +1,16 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package trial