Skip to content

Commit

Permalink
Resume experiment with extra trials from last checkpoint (#952)
Browse files Browse the repository at this point in the history
* Resuming experiment with extra trials

* Resuming experiment with extra trials

* Adding test script

* relative path

* Verify if experiment is running again

* Adding case when maxtrials is not set
  • Loading branch information
johnugeorge authored and k8s-ci-robot committed Dec 9, 2019
1 parent cb7d3e7 commit 4a97e21
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 11 deletions.
14 changes: 14 additions & 0 deletions pkg/apis/controller/experiments/v1alpha3/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func (exp *Experiment) IsCompleted() bool {
return exp.IsSucceeded() || exp.IsFailed()
}

func (exp *Experiment) IsCompletedReason(reason string) bool {
cond := getCondition(exp, ExperimentSucceeded)
if cond != nil && cond.Status == v1.ConditionTrue && cond.Reason == reason {
return true
}
return false
}

func (exp *Experiment) HasRunningTrials() bool {
return exp.Status.TrialsRunning != 0
}
Expand Down Expand Up @@ -131,6 +139,12 @@ func (exp *Experiment) MarkExperimentStatusRunning(reason, message string) {
exp.setCondition(ExperimentRunning, v1.ConditionTrue, reason, message)
}

func (exp *Experiment) MarkExperimentStatusRestarting(reason, message string) {
exp.removeCondition(ExperimentSucceeded)
exp.removeCondition(ExperimentFailed)
exp.setCondition(ExperimentRestarting, v1.ConditionTrue, reason, message)
}

func (exp *Experiment) MarkExperimentStatusSucceeded(reason, message string) {
currentCond := getCondition(exp, ExperimentRunning)
if currentCond != nil {
Expand Down
19 changes: 17 additions & 2 deletions pkg/controller.v1alpha3/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,23 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re
return r.updateFinalizers(instance, finalizers)
}

if instance.IsCompleted() && !instance.HasRunningTrials() {
return reconcile.Result{}, nil
if instance.IsCompleted() {
// Check if completed instance is restartable
// Experiment is restartable only if it is in succeeded state by reaching max trials
if util.IsCompletedExperimentRestartable(instance) {
// Check if max trials is reconfigured
if (instance.Spec.MaxTrialCount != nil &&
*instance.Spec.MaxTrialCount != instance.Status.Trials) ||
(instance.Spec.MaxTrialCount == nil && instance.Status.Trials != 0) {
msg := "Experiment is restarted"
instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
}
} else {
// If experiment is completed with no running trials, stop reconcile
if !instance.HasRunningTrials() {
return reconcile.Result{}, nil
}
}
}
if !instance.IsCreated() {
if instance.Status.StartTime == nil {
Expand Down
26 changes: 18 additions & 8 deletions pkg/controller.v1alpha3/experiment/util/status_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ import (
var log = logf.Log.WithName("experiment-status-util")

const (
ExperimentCreatedReason = "ExperimentCreated"
ExperimentRunningReason = "ExperimentRunning"
ExperimentSucceededReason = "ExperimentSucceeded"
ExperimentFailedReason = "ExperimentFailed"
ExperimentKilledReason = "ExperimentKilled"
ExperimentCreatedReason = "ExperimentCreated"
ExperimentRunningReason = "ExperimentRunning"
ExperimentRestartingReason = "ExperimentRestarting"
ExperimentGoalReachedReason = "ExperimentGoalReached"
ExperimentMaxTrialsReachedReason = "ExperimentMaxTrialsReached"
ExperimentSuggestionEndReachedReason = "ExperimentSuggestionEndReached"
ExperimentFailedReason = "ExperimentFailed"
ExperimentKilledReason = "ExperimentKilled"
)

func UpdateExperimentStatus(collector *ExperimentsCollector, instance *experimentsv1alpha3.Experiment, trials *trialsv1alpha3.TrialList) error {
Expand Down Expand Up @@ -143,23 +146,23 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance *

if isObjectiveGoalReached {
msg := "Experiment has succeeded because Objective goal has reached"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.MarkExperimentStatusSucceeded(ExperimentGoalReachedReason, msg)
instance.Status.CompletionTime = &now
collector.IncreaseExperimentsSucceededCount(instance.Namespace)
return
}

if (instance.Spec.MaxTrialCount != nil) && (completedTrialsCount >= *instance.Spec.MaxTrialCount) {
msg := "Experiment has succeeded because max trial count has reached"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.MarkExperimentStatusSucceeded(ExperimentMaxTrialsReachedReason, msg)
instance.Status.CompletionTime = &now
collector.IncreaseExperimentsSucceededCount(instance.Namespace)
return
}

if getSuggestionDone && (instance.Status.TrialsPending+instance.Status.TrialsRunning) == 0 {
msg := "Experiment has succeeded because suggestion service has reached the end"
instance.MarkExperimentStatusSucceeded(ExperimentSucceededReason, msg)
instance.MarkExperimentStatusSucceeded(ExperimentSuggestionEndReachedReason, msg)
instance.Status.CompletionTime = &now
collector.IncreaseExperimentsSucceededCount(instance.Namespace)
return
Expand All @@ -176,3 +179,10 @@ func UpdateExperimentStatusCondition(collector *ExperimentsCollector, instance *
msg := "Experiment is running"
instance.MarkExperimentStatusRunning(ExperimentRunningReason, msg)
}

func IsCompletedExperimentRestartable(instance *experimentsv1alpha3.Experiment) bool {
if instance.IsSucceeded() && instance.IsCompletedReason(ExperimentMaxTrialsReachedReason) {
return true
}
return false
}
19 changes: 19 additions & 0 deletions pkg/mock/v1alpha3/util/katibclient/katibclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/util/v1alpha3/katibclient/katib_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Client interface {
GetClient() client.Client
GetExperimentList(namespace ...string) (*experimentsv1alpha3.ExperimentList, error)
CreateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error
UpdateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error
DeleteExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error
GetExperiment(name string, namespace ...string) (*experimentsv1alpha3.Experiment, error)
GetConfigMap(name string, namespace ...string) (map[string]string, error)
Expand Down Expand Up @@ -123,6 +124,14 @@ func (k *KatibClient) CreateExperiment(experiment *experimentsv1alpha3.Experimen
return nil
}

func (k *KatibClient) UpdateExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error {

if err := k.client.Update(context.Background(), experiment); err != nil {
return err
}
return nil
}

func (k *KatibClient) DeleteExperiment(experiment *experimentsv1alpha3.Experiment, namespace ...string) error {

if err := k.client.Delete(context.Background(), experiment); err != nil {
Expand Down
147 changes: 147 additions & 0 deletions test/e2e/v1alpha3/resume-e2e-experiment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package main

import (
"bytes"
"fmt"
"io/ioutil"
"log"
"os"
"time"

k8syaml "k8s.io/apimachinery/pkg/util/yaml"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient"
)

const (
timeout = 30 * time.Minute
)

func verifyResult(exp *experimentsv1alpha3.Experiment) (*float64, error) {
if len(exp.Status.CurrentOptimalTrial.ParameterAssignments) == 0 {
return nil, fmt.Errorf("Best parameter assignments not updated in status")
}

if len(exp.Status.CurrentOptimalTrial.Observation.Metrics) == 0 {
return nil, fmt.Errorf("Best metrics not updated in status")
}

metric := exp.Status.CurrentOptimalTrial.Observation.Metrics[0]
if metric.Name != exp.Spec.Objective.ObjectiveMetricName {
return nil, fmt.Errorf("Best objective metric not updated in status")
}
return &metric.Value, nil
}

func main() {
if len(os.Args) != 2 {
log.Fatal("Experiment name is missing")
}
expName := os.Args[1]
b, err := ioutil.ReadFile(expName)
if err != nil {
log.Fatal("Error in reading file ", err)
}
exp := &experimentsv1alpha3.Experiment{}
buf := bytes.NewBufferString(string(b))
if err = k8syaml.NewYAMLOrJSONDecoder(buf, 1024).Decode(exp); err != nil {
log.Fatal("Yaml decode error ", err)
}
kclient, err := katibclient.NewClient(client.Options{})
if err != nil {
log.Fatal("NewClient for Katib failed: ", err)
}
exp, err = kclient.GetExperiment(exp.Name, exp.Namespace)
if err != nil {
log.Fatal("Get Experiment error. Experiment not created yet ", err)
}
if exp.Spec.Algorithm.AlgorithmName != "hyperband" {
// Hyperband will validate the parallel trial count,
// thus we should not change it.
var maxtrials int32 = 7
var paralleltrials int32 = 3
exp.Spec.MaxTrialCount = &maxtrials
exp.Spec.ParallelTrialCount = &paralleltrials
}
err = kclient.UpdateExperiment(exp)
if err != nil {
log.Fatal("UpdateExperiment from YAML failed: ", err)
}
endTime := time.Now().Add(timeout)
for time.Now().Before(endTime) {
log.Printf("Waiting for Experiment %s to start running.", exp.Name)
exp, err = kclient.GetExperiment(exp.Name, exp.Namespace)
if err != nil {
log.Fatal("Get Experiment error ", err)
}
if exp.IsRunning() {
log.Printf("Experiment %v started running", exp.Name)
break
}
time.Sleep(5 * time.Second)
}

for time.Now().Before(endTime) {
exp, err = kclient.GetExperiment(exp.Name, exp.Namespace)
if err != nil {
log.Fatal("Get Experiment error ", err)
}
log.Printf("Waiting for Experiment %s to finish.", exp.Name)
log.Printf(`Experiment %s's trials: %d trials, %d pending trials,
%d running trials, %d killed trials, %d succeeded trials, %d failed trials.`,
exp.Name,
exp.Status.Trials, exp.Status.TrialsPending, exp.Status.TrialsRunning,
exp.Status.TrialsKilled, exp.Status.TrialsSucceeded, exp.Status.TrialsFailed)
log.Printf("Optimal Trial for Experiment %s: %v", exp.Name,
exp.Status.CurrentOptimalTrial)
log.Printf("Experiment %s's conditions: %v", exp.Name, exp.Status.Conditions)

suggestion, err := kclient.GetSuggestion(exp.Name, exp.Namespace)
if err != nil {
log.Printf("Get Suggestion error: %v", err)
} else {
log.Printf("Suggestion %s's conditions: %v", suggestion.Name,
suggestion.Status.Conditions)
log.Printf("Suggestion %s's suggestions: %v", suggestion.Name,
suggestion.Status.Suggestions)
}
if exp.IsCompleted() {
log.Printf("Experiment %v finished", exp.Name)
break
}
time.Sleep(20 * time.Second)
}

if !exp.IsCompleted() {
log.Fatal("Experiment run timed out")
}

metricVal, err := verifyResult(exp)
if err != nil {
log.Fatal(err)
}
if metricVal == nil {
log.Fatal("Metric value in CurrentOptimalTrial not populated")
}

objectiveType := exp.Spec.Objective.Type
goal := *exp.Spec.Objective.Goal
if (objectiveType == commonv1alpha3.ObjectiveTypeMinimize && *metricVal < goal) ||
(objectiveType == commonv1alpha3.ObjectiveTypeMaximize && *metricVal > goal) {
log.Print("Objective Goal reached")
} else {

if exp.Status.Trials != *exp.Spec.MaxTrialCount {
log.Fatal("All trials are not run in the experiment ", exp.Status.Trials, exp.Spec.MaxTrialCount)
}

if exp.Status.TrialsSucceeded != *exp.Spec.MaxTrialCount {
log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount)
}
}
log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial)
}
3 changes: 2 additions & 1 deletion test/scripts/v1alpha3/check-katib-ready.sh
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ kubectl -n kubeflow get pod
cd ${GO_DIR}/test/e2e/v1alpha3

echo "Building run-e2e-experiment for e2e test cases"
go build -o run-e2e-experiment github.com/kubeflow/katib/test/e2e/v1alpha3
go build -o run-e2e-experiment ./run-e2e-experiment.go
go build -o resume-e2e-experiment ./resume-e2e-experiment.go

kubectl apply -f valid-experiment.yaml
kubectl delete -f valid-experiment.yaml
Expand Down
2 changes: 2 additions & 0 deletions test/scripts/v1alpha3/run-suggestion-random.sh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ cd ${GO_DIR}/test/e2e/v1alpha3
echo "Running e2e hyperopt random experiment"
export KUBECONFIG=$HOME/.kube/config
./run-e2e-experiment ../../../examples/v1alpha3/random-example.yaml
echo "Resuming the completed random experiment"
./resume-e2e-experiment ../../../examples/v1alpha3/random-example.yaml
kubectl -n kubeflow describe suggestion
kubectl -n kubeflow delete experiment random-example
kubectl describe pods
Expand Down

0 comments on commit 4a97e21

Please sign in to comment.