Skip to content

Commit

Permalink
Refactor studyjobcontroller (kubeflow#254)
Browse files Browse the repository at this point in the history
* Refactor studyjob controller

* Refactor

* Go format files

* More refactor

* Rename studyjobcontroller to studyjob
  • Loading branch information
richardsliu authored and k8s-ci-robot committed Nov 20, 2018
1 parent 597064a commit e5e2dcd
Show file tree
Hide file tree
Showing 5 changed files with 469 additions and 396 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/add_studyjobcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ limitations under the License.
package controller

import (
"github.com/kubeflow/katib/pkg/controller/studyjobcontroller"
"github.com/kubeflow/katib/pkg/controller/studyjob"
)

func init() {
// AddToManagerFuncs is a list of functions to create controllers and add them to a manager.
AddToManagerFuncs = append(AddToManagerFuncs, studyjobcontroller.Add)
AddToManagerFuncs = append(AddToManagerFuncs, studyjob.Add)
}
213 changes: 213 additions & 0 deletions pkg/controller/studyjob/katib_api_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package studyjob

import (
"context"
"log"

"github.com/kubeflow/katib/pkg"
katibapi "github.com/kubeflow/katib/pkg/api"
katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1"
"google.golang.org/grpc"
)

func initializeStudy(instance *katibv1alpha1.StudyJob, ns string) error {
if instance.Spec.SuggestionSpec == nil {
instance.Status.Condition = katibv1alpha1.ConditionFailed
return nil
}
if instance.Spec.SuggestionSpec.SuggestionAlgorithm == "" {
instance.Spec.SuggestionSpec.SuggestionAlgorithm = "random"
}
instance.Status.Condition = katibv1alpha1.ConditionRunning

conn, err := grpc.Dial(pkg.ManagerAddr, grpc.WithInsecure())
if err != nil {
log.Printf("Connect katib manager error %v", err)
instance.Status.Condition = katibv1alpha1.ConditionFailed
return nil
}
defer conn.Close()
c := katibapi.NewManagerClient(conn)

studyConfig, err := getStudyConf(instance)
if err != nil {
return err
}

log.Printf("Create Study %s", studyConfig.Name)
//CreateStudy
studyID, err := createStudy(c, studyConfig)
if err != nil {
return err
}
instance.Status.StudyID = studyID
log.Printf("Study: %s Suggestion Spec %v", studyID, instance.Spec.SuggestionSpec)
var sspec *katibv1alpha1.SuggestionSpec
if instance.Spec.SuggestionSpec != nil {
sspec = instance.Spec.SuggestionSpec
} else {
sspec = &katibv1alpha1.SuggestionSpec{}
}
sspec.SuggestionParameters = append(sspec.SuggestionParameters,
katibapi.SuggestionParameter{
Name: "SuggestionCount",
Value: "0",
})
sPID, err := setSuggestionParam(c, studyID, sspec)
if err != nil {
return err
}
instance.Status.SuggestionParameterID = sPID
instance.Status.SuggestionCount += 1
instance.Status.Condition = katibv1alpha1.ConditionRunning
return nil
}

func getStudyConf(instance *katibv1alpha1.StudyJob) (*katibapi.StudyConfig, error) {
sconf := &katibapi.StudyConfig{
Metrics: []string{},
ParameterConfigs: &katibapi.StudyConfig_ParameterConfigs{
Configs: []*katibapi.ParameterConfig{},
},
}
sconf.Name = instance.Spec.StudyName
sconf.Owner = instance.Spec.Owner
if instance.Spec.OptimizationGoal != nil {
sconf.OptimizationGoal = *instance.Spec.OptimizationGoal
}
sconf.ObjectiveValueName = instance.Spec.ObjectiveValueName
switch instance.Spec.OptimizationType {
case katibv1alpha1.OptimizationTypeMinimize:
sconf.OptimizationType = katibapi.OptimizationType_MINIMIZE
case katibv1alpha1.OptimizationTypeMaximize:
sconf.OptimizationType = katibapi.OptimizationType_MAXIMIZE
default:
sconf.OptimizationType = katibapi.OptimizationType_UNKNOWN_OPTIMIZATION
}
for _, m := range instance.Spec.MetricsNames {
sconf.Metrics = append(sconf.Metrics, m)
}
for _, pc := range instance.Spec.ParameterConfigs {
p := &katibapi.ParameterConfig{
Feasible: &katibapi.FeasibleSpace{},
}
p.Name = pc.Name
p.Feasible.Min = pc.Feasible.Min
p.Feasible.Max = pc.Feasible.Max
p.Feasible.List = pc.Feasible.List
switch pc.ParameterType {
case katibv1alpha1.ParameterTypeUnknown:
p.ParameterType = katibapi.ParameterType_UNKNOWN_TYPE
case katibv1alpha1.ParameterTypeDouble:
p.ParameterType = katibapi.ParameterType_DOUBLE
case katibv1alpha1.ParameterTypeInt:
p.ParameterType = katibapi.ParameterType_INT
case katibv1alpha1.ParameterTypeDiscrete:
p.ParameterType = katibapi.ParameterType_DISCRETE
case katibv1alpha1.ParameterTypeCategorical:
p.ParameterType = katibapi.ParameterType_CATEGORICAL
}
sconf.ParameterConfigs.Configs = append(sconf.ParameterConfigs.Configs, p)
}
sconf.JobId = string(instance.UID)
return sconf, nil
}

func createStudy(c katibapi.ManagerClient, studyConfig *katibapi.StudyConfig) (string, error) {
ctx := context.Background()
createStudyreq := &katibapi.CreateStudyRequest{
StudyConfig: studyConfig,
}
createStudyreply, err := c.CreateStudy(ctx, createStudyreq)
if err != nil {
log.Printf("CreateStudy Error %v", err)
return "", err
}
studyID := createStudyreply.StudyId
log.Printf("Study ID %s", studyID)
getStudyreq := &katibapi.GetStudyRequest{
StudyId: studyID,
}
getStudyReply, err := c.GetStudy(ctx, getStudyreq)
if err != nil {
log.Printf("Study: %s GetConfig Error %v", studyID, err)
return "", err
}
log.Printf("Study ID %s StudyConf%v", studyID, getStudyReply.StudyConfig)
return studyID, nil
}

func setSuggestionParam(c katibapi.ManagerClient, studyID string, suggestionSpec *katibv1alpha1.SuggestionSpec) (string, error) {
ctx := context.Background()
pid := ""
if suggestionSpec.SuggestionParameters != nil {
sspr := &katibapi.SetSuggestionParametersRequest{
StudyId: studyID,
SuggestionAlgorithm: suggestionSpec.SuggestionAlgorithm,
}
for _, p := range suggestionSpec.SuggestionParameters {
sspr.SuggestionParameters = append(
sspr.SuggestionParameters,
&katibapi.SuggestionParameter{
Name: p.Name,
Value: p.Value,
},
)
}
setSuggesitonParameterReply, err := c.SetSuggestionParameters(ctx, sspr)
if err != nil {
log.Printf("Study %s SetConfig Error %v", studyID, err)
return "", err
}
log.Printf("Study: %s setSuggesitonParameterReply %v", studyID, setSuggesitonParameterReply)
pid = setSuggesitonParameterReply.ParamId
}
return pid, nil
}

func getSuggestionParam(c katibapi.ManagerClient, paramID string) ([]*katibapi.SuggestionParameter, error) {
ctx := context.Background()
gsreq := &katibapi.GetSuggestionParametersRequest{
ParamId: paramID,
}
gsrep, err := c.GetSuggestionParameters(ctx, gsreq)
if err != nil {
return nil, err
}
return gsrep.SuggestionParameters, err
}

func getSuggestion(c katibapi.ManagerClient, studyID string, suggestionSpec *katibv1alpha1.SuggestionSpec, sParamID string) (*katibapi.GetSuggestionsReply, error) {
ctx := context.Background()
getSuggestRequest := &katibapi.GetSuggestionsRequest{
StudyId: studyID,
SuggestionAlgorithm: suggestionSpec.SuggestionAlgorithm,
RequestNumber: int32(suggestionSpec.RequestNumber),
//RequestNumber=0 means get all grids.
ParamId: sParamID,
}
getSuggestReply, err := c.GetSuggestions(ctx, getSuggestRequest)
if err != nil {
log.Printf("Study: %s GetSuggestion Error %v", studyID, err)
return nil, err
}
log.Printf("Study: %s CreatedTrials :", studyID)
for _, t := range getSuggestReply.Trials {
log.Printf("\t%v", t)
}
return getSuggestReply, nil
}
151 changes: 151 additions & 0 deletions pkg/controller/studyjob/manifest_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package studyjob

import (
"bytes"
"context"
"fmt"
"log"
"text/template"

katibapi "github.com/kubeflow/katib/pkg/api"
katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
)

func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) {
var typeChecker interface{}
BUFSIZE := 1024
_, m, err := getWorkerManifest(
nil,
"validation",
&katibapi.Trial{
TrialId: "validation",
ParameterSet: []*katibapi.Parameter{},
},
workerSpec,
"",
true,
)
if err != nil {
return "", err
}
if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil {
log.Printf("Yaml decode validation error %v", err)
return "", err
}
tcMap, ok := typeChecker.(map[string]interface{})
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkind, ok := tcMap["kind"]
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
wkindS, ok := wkind.(string)
if !ok {
return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker)
}
return wkindS, nil
}

func getWorkerManifest(c katibapi.ManagerClient, studyID string, trial *katibapi.Trial, workerSpec *katibv1alpha1.WorkerSpec, kind string, dryrun bool) (string, *bytes.Buffer, error) {
var wtp *template.Template = nil
var err error
if workerSpec != nil {
if workerSpec.GoTemplate.RawTemplate != "" {
wtp, err = template.New("Worker").Parse(workerSpec.GoTemplate.RawTemplate)
} else if workerSpec.GoTemplate.TemplatePath != "" {
wtp, err = template.ParseFiles(workerSpec.GoTemplate.TemplatePath)
}
if err != nil {
return "", nil, err
}
}
if wtp == nil {
wtp, err = template.ParseFiles("/worker-template/defaultWorkerTemplate.yaml")
if err != nil {
return "", nil, err
}
}
var wid string
if dryrun {
wid = "validation"
} else {
cwreq := &katibapi.RegisterWorkerRequest{
Worker: &katibapi.Worker{
StudyId: studyID,
TrialId: trial.TrialId,
Status: katibapi.State_PENDING,
Type: kind,
},
}
cwrep, err := c.RegisterWorker(context.Background(), cwreq)
if err != nil {
return "", nil, err
}
wid = cwrep.WorkerId
}

wi := WorkerInstance{
StudyID: studyID,
TrialID: trial.TrialId,
WorkerID: wid,
}
var b bytes.Buffer
for _, p := range trial.ParameterSet {
wi.HyperParameters = append(wi.HyperParameters, p)
}
err = wtp.Execute(&b, wi)
if err != nil {
return "", nil, err
}
return wid, &b, nil
}

func getMetricsCollectorManifest(studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) (*bytes.Buffer, error) {
var mtp *template.Template = nil
var err error
tmpValues := map[string]string{
"StudyID": studyID,
"TrialID": trialID,
"WorkerID": workerID,
"NameSpace": namespace,
}
if mcs != nil {
if mcs.GoTemplate.RawTemplate != "" {
mtp, err = template.New("MetricsCollector").Parse(mcs.GoTemplate.RawTemplate)
} else if mcs.GoTemplate.TemplatePath != "" {
mtp, err = template.ParseFiles(mcs.GoTemplate.TemplatePath)
} else {
}
if err != nil {
return nil, err
}
}
if mtp == nil {
mtp, err = template.ParseFiles("/metricscollector-template/defaultMetricsCollectorTemplate.yaml")
if err != nil {
return nil, err
}
}
var b bytes.Buffer
err = mtp.Execute(&b, tmpValues)
if err != nil {
return nil, err
}
return &b, nil
}
Loading

0 comments on commit e5e2dcd

Please sign in to comment.