Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete Suggestion deployment after Experiment is finished #1150

Merged
Merged
3 changes: 3 additions & 0 deletions pkg/apis/controller/experiments/v1alpha3/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ const (

// Default value of Spec.TemplatePath
DefaultTrialTemplatePath = "defaultTrialTemplate.yaml"

// Default value of Spec.DefaultResumePolicy
DefaultResumePolicy = LongRunning
)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion pkg/apis/controller/experiments/v1alpha3/experiment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ type ExperimentSpec struct {

NasConfig *NasConfig `json:"nasConfig,omitempty"`

// Describes resuming policy which usually take effect after experiment terminated.
ResumePolicy ResumePolicyType `json:"resumePolicy,omitempty"`

// TODO - Other fields, exact format is TBD. Will add these back during implementation.
// - Early stopping
// - Resume experiment
}

type ExperimentStatus struct {
Expand Down Expand Up @@ -154,6 +156,13 @@ const (
ExperimentFailed ExperimentConditionType = "Failed"
)

type ResumePolicyType string

const (
NeverResume ResumePolicyType = "Never"
LongRunning ResumePolicyType = "LongRunning"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name ResumePolicy.
What do you think about the name LongRunning? Is it better than AlwaysRunning?
/cc @johnugeorge @gaocegege @richardsliu

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaocegege @johnugeorge @richardsliu Do you have any ideas about the naming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it more consistent, how about keeping options to "Never" and "Always"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnugeorge As which proposed in #1062 , there will be the third policy type representing "resuming experiment from persistent format"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @sperlingxx.
In case when Experiment is not deleted and always running, ResumeExperiment: Always sounds not very obviously.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. I think we can use LongRunning

)

type ParameterSpec struct {
Name string `json:"name,omitempty"`
ParameterType ParameterType `json:"parameterType,omitempty"`
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/v1alpha3/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/v1alpha3/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@
"$ref": "#/definitions/v1alpha3.ParameterSpec"
}
},
"resumePolicy": {
"description": "Describes resuming policy which usually take effect after experiment terminated.",
"type": "string"
},
"trialTemplate": {
"description": "Template for each run of the trial.",
"$ref": "#/definitions/v1alpha3.TrialTemplate"
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller.v1alpha3/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re
instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
}
} else {
if instance.Spec.ResumePolicy != experimentsv1alpha3.LongRunning {
return r.terminateSuggestion(instance)
}
// If experiment is completed with no running trials, stop reconcile
if !instance.HasRunningTrials() {
return reconcile.Result{}, nil
Expand Down Expand Up @@ -263,6 +266,7 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha3.
if reconcileRequired {
r.ReconcileTrials(instance, trials.Items)
}

return nil
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/controller.v1alpha3/experiment/experiment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -14,6 +15,7 @@ import (
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
suggestionsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"
trialsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
suggestionController "github.com/kubeflow/katib/pkg/controller.v1alpha3/suggestion"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
)

Expand Down Expand Up @@ -109,3 +111,31 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1alpha3.Exp
return reconcile.Result{Requeue: true}, err
}
}

func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1alpha3.Experiment) (reconcile.Result, error) {
log.Info("Start terminating original...")
original := &suggestionsv1alpha3.Suggestion{}
err := r.Get(context.TODO(), types.NamespacedName{
Namespace: instance.Namespace,
Name: instance.Name,
}, original)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
if original.IsCompleted() {
return reconcile.Result{}, nil
}
suggestion := original.DeepCopy()
msg := "Suggestion is succeeded"
suggestion.MarkSuggestionStatusSucceeded(suggestionController.SuggestionSucceededReason, msg)
log.Info("Mark suggestion succeeded...")

if err := r.UpdateSuggestion(suggestion); err != nil {
return reconcile.Result{}, err
} else {
return reconcile.Result{Requeue: true}, nil
}
}
11 changes: 10 additions & 1 deletion pkg/controller.v1alpha3/suggestion/suggestion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,16 @@ func (r *ReconcileSuggestion) Reconcile(request reconcile.Request) (reconcile.Re
return reconcile.Result{}, err
}
instance := oldS.DeepCopy()
if instance.IsCompleted() {
// If ResumePolicyType is LongRunning, suggestion status will never be succeeded.
if instance.IsSucceeded() {
err = r.deleteDeployment(instance)
if err != nil {
return reconcile.Result{}, err
}
err = r.deleteService(instance)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
if !instance.IsCreated() {
Expand Down
45 changes: 45 additions & 0 deletions pkg/controller.v1alpha3/suggestion/suggestion_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package suggestion

import (
"context"
"github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -34,3 +35,47 @@ func (r *ReconcileSuggestion) reconcileService(service *corev1.Service) (*corev1
}
return foundService, nil
}

func (r *ReconcileSuggestion) deleteDeployment(instance *v1alpha3.Suggestion) error {
deploy, err := r.DesiredDeployment(instance)
if err != nil {
return err
}
realDeploy := &appsv1.Deployment{}
err = r.Get(context.TODO(), types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}, realDeploy)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
err = r.Delete(context.TODO(), realDeploy)
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
log.Info("suggestion deployment %s has been deleted", realDeploy.Name)

return nil
}

func (r *ReconcileSuggestion) deleteService(instance *v1alpha3.Suggestion) error {
service, err := r.DesiredService(instance)
if err != nil {
return err
}
realService := &corev1.Service{}
err = r.Get(context.TODO(), types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, realService)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
err = r.Delete(context.TODO(), realService)
if err != nil {
return err
}
log.Info("suggestion service %s has been deleted", realService.Name)

return nil
}
15 changes: 15 additions & 0 deletions pkg/webhook/v1alpha3/experiment/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (g *DefaultValidator) ValidateExperiment(instance, oldInst *experimentsv1al
if err := g.validateAlgorithm(instance.Spec.Algorithm); err != nil {
return err
}
if err := g.validateResumePolicy(instance.Spec.ResumePolicy); err != nil {
return err
}

if err := g.validateTrialTemplate(instance); err != nil {
return err
Expand Down Expand Up @@ -115,6 +118,18 @@ func (g *DefaultValidator) validateAlgorithm(ag *commonapiv1alpha3.AlgorithmSpec
return nil
}

func (g *DefaultValidator) validateResumePolicy(resume experimentsv1alpha3.ResumePolicyType) error {
validTypes := map[experimentsv1alpha3.ResumePolicyType]string{
"": "",
experimentsv1alpha3.NeverResume: "",
experimentsv1alpha3.LongRunning: "",
}
if _, ok := validTypes[resume]; !ok {
return fmt.Errorf("invalid ResumePolicyType %s", resume)
}
return nil
}

func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1alpha3.Experiment) error {
trialName := fmt.Sprintf("%s-trial", instance.GetName())
runSpec, err := g.GetRunSpec(instance, instance.GetName(), trialName, instance.GetNamespace())
Expand Down
9 changes: 9 additions & 0 deletions pkg/webhook/v1alpha3/experiment/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ metadata:
return i
}(),
},
{
Instance: newFakeInstance(),
Err: true,
oldInstance: func() *experimentsv1alpha3.Experiment {
i := newFakeInstance()
i.Spec.ResumePolicy = "invalid-policy"
return i
}(),
},
{
Instance: func() *experimentsv1alpha3.Experiment {
i := newFakeInstance()
Expand Down
31 changes: 31 additions & 0 deletions test/e2e/v1alpha3/resume-e2e-experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package main

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"log"
"os"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
controllerUtil "github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient"
)

Expand Down Expand Up @@ -146,5 +152,30 @@ func main() {
log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount)
}
}

sug, err := kclient.GetSuggestion(exp.Name, exp.Namespace)
if exp.Spec.ResumePolicy == experimentsv1alpha3.LongRunning {
if sug.IsSucceeded() {
log.Fatal("Suggestion is terminated while ResumePolicy = LongRunning")
}
}
if exp.Spec.ResumePolicy == experimentsv1alpha3.NeverResume {
if sug.IsRunning() {
log.Fatal("Suggestion is still running while ResumePolicy = NeverResume")
}
namespacedName := types.NamespacedName{Name: controllerUtil.GetAlgorithmServiceName(sug), Namespace: sug.Namespace}
service := &corev1.Service{}
err := kclient.GetClient().Get(context.TODO(), namespacedName, service)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion service is still alive while ResumePolicy = NeverResume")
}
namespacedName = types.NamespacedName{Name: controllerUtil.GetAlgorithmDeploymentName(sug), Namespace: sug.Namespace}
deployment := &appsv1.Deployment{}
err = kclient.GetClient().Get(context.TODO(), namespacedName, deployment)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion deployment is still alive while ResumePolicy = NeverResume")
}
}

log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial)
}
31 changes: 31 additions & 0 deletions test/e2e/v1alpha3/run-e2e-experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package main

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
"log"
"os"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
controllerUtil "github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient"
)

Expand Down Expand Up @@ -129,5 +135,30 @@ func main() {
log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount)
}
}

sug, err := kclient.GetSuggestion(exp.Name, exp.Namespace)
if exp.Spec.ResumePolicy == experimentsv1alpha3.LongRunning {
if sug.IsSucceeded() {
log.Fatal("Suggestion is terminated while ResumePolicy = LongRunning")
}
}
if exp.Spec.ResumePolicy == experimentsv1alpha3.NeverResume {
if sug.IsRunning() {
log.Fatal("Suggestion is still running while ResumePolicy = NeverResume")
}
namespacedName := types.NamespacedName{Name: controllerUtil.GetAlgorithmServiceName(sug), Namespace: sug.Namespace}
service := &corev1.Service{}
err := kclient.GetClient().Get(context.TODO(), namespacedName, service)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion service is still alive while ResumePolicy = NeverResume")
}
namespacedName = types.NamespacedName{Name: controllerUtil.GetAlgorithmDeploymentName(sug), Namespace: sug.Namespace}
deployment := &appsv1.Deployment{}
err = kclient.GetClient().Get(context.TODO(), namespacedName, deployment)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion deployment is still alive while ResumePolicy = NeverResume")
}
}

log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial)
}