diff --git a/pkg/apis/controller/experiments/v1alpha3/util.go b/pkg/apis/controller/experiments/v1alpha3/util.go index 3aae2d6c93c..722f0192a1a 100644 --- a/pkg/apis/controller/experiments/v1alpha3/util.go +++ b/pkg/apis/controller/experiments/v1alpha3/util.go @@ -93,6 +93,14 @@ func (exp *Experiment) IsCompleted() bool { return exp.IsSucceeded() || exp.IsFailed() } +func (exp *Experiment) IsCompletedReason(reason string) bool { + cond := getCondition(exp, ExperimentSucceeded) + if cond != nil && cond.Status == v1.ConditionTrue && cond.Reason == reason { + return true + } + return false +} + func (exp *Experiment) HasRunningTrials() bool { return exp.Status.TrialsRunning != 0 } @@ -131,6 +139,12 @@ func (exp *Experiment) MarkExperimentStatusRunning(reason, message string) { exp.setCondition(ExperimentRunning, v1.ConditionTrue, reason, message) } +func (exp *Experiment) MarkExperimentStatusRestarting(reason, message string) { + exp.removeCondition(ExperimentSucceeded) + exp.removeCondition(ExperimentFailed) + exp.setCondition(ExperimentRestarting, v1.ConditionTrue, reason, message) +} + func (exp *Experiment) MarkExperimentStatusSucceeded(reason, message string) { currentCond := getCondition(exp, ExperimentRunning) if currentCond != nil { diff --git a/pkg/controller.v1alpha3/experiment/experiment_controller.go b/pkg/controller.v1alpha3/experiment/experiment_controller.go index 4a38900325c..bc096eaa5af 100644 --- a/pkg/controller.v1alpha3/experiment/experiment_controller.go +++ b/pkg/controller.v1alpha3/experiment/experiment_controller.go @@ -189,8 +189,23 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re return r.updateFinalizers(instance, finalizers) } - if instance.IsCompleted() && !instance.HasRunningTrials() { - return reconcile.Result{}, nil + if instance.IsCompleted() { + // Check if completed instance is restartable + // Experiment is restartable only if it is in succeeded state by reaching max trials + if util.IsCompletedExperimentRestartable(instance) { + // Check if max trials is reconfigured + if (instance.Spec.MaxTrialCount != nil && + *instance.Spec.MaxTrialCount != instance.Status.Trials) || + (instance.Spec.MaxTrialCount == nil && instance.Status.Trials != 0) { + msg := "Experiment is restarted" + instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg) + } + } else { + // If experiment is completed with no running trials, stop reconcile + if !instance.HasRunningTrials() { + return reconcile.Result{}, nil + } + } } if !instance.IsCreated() { if instance.Status.StartTime == nil { diff --git a/pkg/controller.v1alpha3/experiment/util/status_util.go b/pkg/controller.v1alpha3/experiment/util/status_util.go index 41cf7ca6ebe..39a793f348a 100644 --- a/pkg/controller.v1alpha3/experiment/util/status_util.go +++ b/pkg/controller.v1alpha3/experiment/util/status_util.go @@ -27,11 +27,14 @@ import ( var log = logf.Log.WithName("experiment-status-util") const ( - ExperimentCreatedReason = "ExperimentCreated" - ExperimentRunningReason = "ExperimentRunning" - ExperimentSucceededReason = "ExperimentSucceeded" - ExperimentFailedReason = "ExperimentFailed" - ExperimentKilledReason = "ExperimentKilled" + ExperimentCreatedReason = "ExperimentCreated" + ExperimentRunningReason = "ExperimentRunning" + ExperimentRestartingReason = "ExperimentRestarting" + ExperimentGoalReachedReason = "ExperimentGoalReached" + ExperimentMaxTrialsReachedReason = "ExperimentMaxTrialsReached" + ExperimentSuggestionEndReachedReason = "ExperimentSuggestionEndReached" + ExperimentFailedReason = "ExperimentFailed" + ExperimentKilledReason = "ExperimentKilled" ) func UpdateExperimentStatus(collector *ExperimentsCollector, instance *experimentsv1alpha3.Experiment, trials *trialsv1alpha3.TrialList) error { @@ -143,7 +146,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * if isObjectiveGoalReached { msg := "Experiment has succeeded because Objective goal has reached" - instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) + instance.MarkExperimentStatusSucceeded(ExperimentGoalReachedReason, msg) instance.Status.CompletionTime = &now collector.IncreaseExperimentsSucceededCount(instance.Namespace) return @@ -151,7 +154,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * if (instance.Spec.MaxTrialCount != nil) && (completedTrialsCount >= *instance.Spec.MaxTrialCount) { msg := "Experiment has succeeded because max trial count has reached" - instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) + instance.MarkExperimentStatusSucceeded(ExperimentMaxTrialsReachedReason, msg) instance.Status.CompletionTime = &now collector.IncreaseExperimentsSucceededCount(instance.Namespace) return @@ -159,7 +162,7 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * if getSuggestionDone && (instance.Status.TrialsPending+instance.Status.TrialsRunning) == 0 { msg := "Experiment has succeeded because suggestion service has reached the end" - instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg) + instance.MarkExperimentStatusSucceeded(ExperimentSuggestionEndReachedReason, msg) instance.Status.CompletionTime = &now collector.IncreaseExperimentsSucceededCount(instance.Namespace) return @@ -176,3 +179,10 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance * msg := "Experiment is running" instance.MarkExperimentStatusRunning(ExperimentRunningReason, msg) } + +func IsCompletedExperimentRestartable(instance *experimentsv1alpha3.Experiment) bool { + if instance.IsSucceeded() && instance.IsCompletedReason(ExperimentMaxTrialsReachedReason) { + return true + } + return false +} diff --git a/pkg/mock/v1alpha3/util/katibclient/katibclient.go b/pkg/mock/v1alpha3/util/katibclient/katibclient.go index 95d452683d5..481791320e4 100644 --- a/pkg/mock/v1alpha3/util/katibclient/katibclient.go +++ b/pkg/mock/v1alpha3/util/katibclient/katibclient.go @@ -218,6 +218,25 @@ func (mr *MockClientMockRecorder) InjectClient(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InjectClient", reflect.TypeOf((*MockClient)(nil).InjectClient), arg0) } +// UpdateExperiment mocks base method +func (m *MockClient) UpdateExperiment(arg0 *v1alpha3.Experiment, arg1 ...string) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UpdateExperiment", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateExperiment indicates an expected call of UpdateExperiment +func (mr *MockClientMockRecorder) UpdateExperiment(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateExperiment", reflect.TypeOf((*MockClient)(nil).UpdateExperiment), varargs...) +} + // UpdateTrialTemplates mocks base method func (m *MockClient) UpdateTrialTemplates(arg0 map[string]string, arg1 ...string) error { m.ctrl.T.Helper() diff --git a/pkg/util/v1alpha3/katibclient/katib_client.go b/pkg/util/v1alpha3/katibclient/katib_client.go index bee33901cef..8e4fde72f18 100644 --- a/pkg/util/v1alpha3/katibclient/katib_client.go +++ b/pkg/util/v1alpha3/katibclient/katib_client.go @@ -36,6 +36,7 @@ type Client interface { GetClient() client.Client GetExperimentList(namespace ...string) (*experimentsv1alpha3.ExperimentList, error) CreateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error + UpdateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error DeleteExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error GetExperiment(name string, namespace ...string) (*experimentsv1alpha3.Experiment, error) GetConfigMap(name string, namespace ...string) (map[string]string, error) @@ -123,6 +124,14 @@ func (k *KatibClient) CreateExperiment(experiment *experimentsv1alpha3.Experimen return nil } +func (k *KatibClient) UpdateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error { + + if err := k.client.Update(context.Background(), experiment); err != nil { + return err + } + return nil +} + func (k *KatibClient) DeleteExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error { if err := k.client.Delete(context.Background(), experiment); err != nil { diff --git a/test/e2e/v1alpha3/resume-e2e-experiment.go b/test/e2e/v1alpha3/resume-e2e-experiment.go new file mode 100644 index 00000000000..6439d19681e --- /dev/null +++ b/test/e2e/v1alpha3/resume-e2e-experiment.go @@ -0,0 +1,147 @@ +package main + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "os" + "time" + + k8syaml "k8s.io/apimachinery/pkg/util/yaml" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "sigs.k8s.io/controller-runtime/pkg/client" + + commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3" + experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3" + "github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient" +) + +const ( + timeout = 30 * time.Minute +) + +func verifyResult(exp *experimentsv1alpha3.Experiment) (*float64, error) { + if len(exp.Status.CurrentOptimalTrial.ParameterAssignments) == 0 { + return nil, fmt.Errorf("Best parameter assignments not updated in status") + } + + if len(exp.Status.CurrentOptimalTrial.Observation.Metrics) == 0 { + return nil, fmt.Errorf("Best metrics not updated in status") + } + + metric := exp.Status.CurrentOptimalTrial.Observation.Metrics[0] + if metric.Name != exp.Spec.Objective.ObjectiveMetricName { + return nil, fmt.Errorf("Best objective metric not updated in status") + } + return &metric.Value, nil +} + +func main() { + if len(os.Args) != 2 { + log.Fatal("Experiment name is missing") + } + expName := os.Args[1] + b, err := ioutil.ReadFile(expName) + if err != nil { + log.Fatal("Error in reading file ", err) + } + exp := &experimentsv1alpha3.Experiment{} + buf := bytes.NewBufferString(string(b)) + if err = k8syaml.NewYAMLOrJSONDecoder(buf, 1024).Decode(exp); err != nil { + log.Fatal("Yaml decode error ", err) + } + kclient, err := katibclient.NewClient(client.Options{}) + if err != nil { + log.Fatal("NewClient for Katib failed: ", err) + } + exp, err = kclient.GetExperiment(exp.Name, exp.Namespace) + if err != nil { + log.Fatal("Get Experiment error. Experiment not created yet ", err) + } + if exp.Spec.Algorithm.AlgorithmName != "hyperband" { + // Hyperband will validate the parallel trial count, + // thus we should not change it. + var maxtrials int32 = 7 + var paralleltrials int32 = 3 + exp.Spec.MaxTrialCount = &maxtrials + exp.Spec.ParallelTrialCount = ¶lleltrials + } + err = kclient.UpdateExperiment(exp) + if err != nil { + log.Fatal("UpdateExperiment from YAML failed: ", err) + } + endTime := time.Now().Add(timeout) + for time.Now().Before(endTime) { + log.Printf("Waiting for Experiment %s to start running.", exp.Name) + exp, err = kclient.GetExperiment(exp.Name, exp.Namespace) + if err != nil { + log.Fatal("Get Experiment error ", err) + } + if exp.IsRunning() { + log.Printf("Experiment %v started running", exp.Name) + break + } + time.Sleep(5 * time.Second) + } + + for time.Now().Before(endTime) { + exp, err = kclient.GetExperiment(exp.Name, exp.Namespace) + if err != nil { + log.Fatal("Get Experiment error ", err) + } + log.Printf("Waiting for Experiment %s to finish.", exp.Name) + log.Printf(`Experiment %s's trials: %d trials, %d pending trials, +%d running trials, %d killed trials, %d succeeded trials, %d failed trials.`, + exp.Name, + exp.Status.Trials, exp.Status.TrialsPending, exp.Status.TrialsRunning, + exp.Status.TrialsKilled, exp.Status.TrialsSucceeded, exp.Status.TrialsFailed) + log.Printf("Optimal Trial for Experiment %s: %v", exp.Name, + exp.Status.CurrentOptimalTrial) + log.Printf("Experiment %s's conditions: %v", exp.Name, exp.Status.Conditions) + + suggestion, err := kclient.GetSuggestion(exp.Name, exp.Namespace) + if err != nil { + log.Printf("Get Suggestion error: %v", err) + } else { + log.Printf("Suggestion %s's conditions: %v", suggestion.Name, + suggestion.Status.Conditions) + log.Printf("Suggestion %s's suggestions: %v", suggestion.Name, + suggestion.Status.Suggestions) + } + if exp.IsCompleted() { + log.Printf("Experiment %v finished", exp.Name) + break + } + time.Sleep(20 * time.Second) + } + + if !exp.IsCompleted() { + log.Fatal("Experiment run timed out") + } + + metricVal, err := verifyResult(exp) + if err != nil { + log.Fatal(err) + } + if metricVal == nil { + log.Fatal("Metric value in CurrentOptimalTrial not populated") + } + + objectiveType := exp.Spec.Objective.Type + goal := *exp.Spec.Objective.Goal + if (objectiveType == commonv1alpha3.ObjectiveTypeMinimize && *metricVal < goal) || + (objectiveType == commonv1alpha3.ObjectiveTypeMaximize && *metricVal > goal) { + log.Print("Objective Goal reached") + } else { + + if exp.Status.Trials != *exp.Spec.MaxTrialCount { + log.Fatal("All trials are not run in the experiment ", exp.Status.Trials, exp.Spec.MaxTrialCount) + } + + if exp.Status.TrialsSucceeded != *exp.Spec.MaxTrialCount { + log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount) + } + } + log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial) +} diff --git a/test/scripts/v1alpha3/check-katib-ready.sh b/test/scripts/v1alpha3/check-katib-ready.sh index 909c4717db1..12ec1b1f524 100755 --- a/test/scripts/v1alpha3/check-katib-ready.sh +++ b/test/scripts/v1alpha3/check-katib-ready.sh @@ -140,7 +140,8 @@ kubectl -n kubeflow get pod cd ${GO_DIR}/test/e2e/v1alpha3 echo "Building run-e2e-experiment for e2e test cases" -go build -o run-e2e-experiment github.com/kubeflow/katib/test/e2e/v1alpha3 +go build -o run-e2e-experiment ./run-e2e-experiment.go +go build -o resume-e2e-experiment ./resume-e2e-experiment.go kubectl apply -f valid-experiment.yaml kubectl delete -f valid-experiment.yaml diff --git a/test/scripts/v1alpha3/run-suggestion-random.sh b/test/scripts/v1alpha3/run-suggestion-random.sh index fd1f7083537..12995948e6c 100755 --- a/test/scripts/v1alpha3/run-suggestion-random.sh +++ b/test/scripts/v1alpha3/run-suggestion-random.sh @@ -57,6 +57,8 @@ cd ${GO_DIR}/test/e2e/v1alpha3 echo "Running e2e hyperopt random experiment" export KUBECONFIG=$HOME/.kube/config ./run-e2e-experiment ../../../examples/v1alpha3/random-example.yaml +echo "Resuming the completed random experiment" +./resume-e2e-experiment ../../../examples/v1alpha3/random-example.yaml kubectl -n kubeflow describe suggestion kubectl -n kubeflow delete experiment random-example kubectl describe pods