Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: volcano pod group creation issue #1390

Merged
merged 1 commit into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/training-operator.v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ func main() {
var enableLeaderElection bool
var probeAddr string
var enabledSchemes controller_v1.EnabledSchemes
var enableGangScheduling bool
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfjob --enable-scheme=pytorchjob, case insensitive."+
" Now supporting TFJob, PyTorchJob, MXNetJob, XGBoostJob. By default, all supported schemes will be enabled.")
flag.BoolVar(&enableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -100,7 +102,7 @@ func main() {
"scheme not supported", "scheme", s)
os.Exit(1)
}
if err = setupFunc(mgr); err != nil {
if err = setupFunc(mgr, enableGangScheduling); err != nil {
setupLog.Error(err, "unable to create controller", "controller", s)
os.Exit(1)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ go 1.14

require (
github.com/go-logr/logr v0.3.0
github.com/go-openapi/spec v0.19.9
github.com/go-openapi/spec v0.20.3
github.com/kubeflow/common v0.3.4
github.com/onrik/logrus v0.2.2-0.20181225141908-a09d5cdcdc62
github.com/onsi/ginkgo v1.14.1
github.com/onsi/gomega v1.10.2
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_golang v1.10.0
github.com/sirupsen/logrus v1.6.0
k8s.io/api v0.19.9
k8s.io/apiextensions-apiserver v0.19.9
Expand Down
199 changes: 199 additions & 0 deletions go.sum

Large diffs are not rendered by default.

31 changes: 16 additions & 15 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"reflect"
"time"

"k8s.io/client-go/informers"

"github.com/kubeflow/tf-operator/pkg/apis/mxnet/validation"

"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -73,17 +75,12 @@ const (

var (
jobOwnerKey = ".metadata.controller"
// DefaultMXControllerConfiguration is the suggested mxnetjob-controller configuration for production.
DefaultMXControllerConfiguration = common.JobControllerConfiguration{
ReconcilerSyncLoopPeriod: metav1.Duration{Duration: 15 * time.Second},
EnableGangScheduling: false,
}
// DefaultCleanPodPolicy is the default clean pod policy controller assign the new Job if not exist
DefaultCleanPodPolicy = commonv1.CleanPodPolicyNone
)

// NewReconciler creates a MXJob Reconciler
func NewReconciler(mgr manager.Manager) *MXJobReconciler {
func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *MXJobReconciler {
r := &MXJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -95,18 +92,22 @@ func NewReconciler(mgr manager.Manager) *MXJobReconciler {
cfg := mgr.GetConfig()
kubeClientSet := kubeclientset.NewForConfigOrDie(cfg)
volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg)
sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses()

// Initialize common job controller
r.JobController = common.JobController{
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: false},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.Recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.Recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.Recorder},
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.Recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PriorityClassLister: priorityClassInformer.Lister(),
PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.Recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.Recorder},
}

return r
Expand Down
25 changes: 15 additions & 10 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -65,7 +66,7 @@ var (
)

// NewReconciler creates a PyTorchJob Reconciler
func NewReconciler(mgr manager.Manager) *PyTorchJobReconciler {
func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *PyTorchJobReconciler {
r := &PyTorchJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -77,18 +78,22 @@ func NewReconciler(mgr manager.Manager) *PyTorchJobReconciler {
cfg := mgr.GetConfig()
kubeClientSet := kubeclientset.NewForConfigOrDie(cfg)
volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg)
sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses()

// Initialize common job controller
r.JobController = common.JobController{
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: false},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder},
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PriorityClassLister: priorityClassInformer.Lister(),
PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder},
}

return r
Expand Down
18 changes: 9 additions & 9 deletions pkg/controller.v1/register_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ import (

const ErrTemplateSchemeNotSupported = "scheme %s is not supported yet"

type ReconcilerSetupFunc func(manager2 manager.Manager) error
type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool) error

var SupportedSchemeReconciler = map[string]ReconcilerSetupFunc{
tensorflowv1.Kind: func(mgr manager.Manager) error {
return tensorflowcontroller.NewReconciler(mgr).SetupWithManager(mgr)
tensorflowv1.Kind: func(mgr manager.Manager, enableGangScheduling bool) error {
return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
},
pytorchv1.Kind: func(mgr manager.Manager) error {
return pytorchcontroller.NewReconciler(mgr).SetupWithManager(mgr)
pytorchv1.Kind: func(mgr manager.Manager, enableGangScheduling bool) error {
return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
},
mxnetv1.Kind: func(mgr manager.Manager) error {
return mxnetcontroller.NewReconciler(mgr).SetupWithManager(mgr)
mxnetv1.Kind: func(mgr manager.Manager, enableGangScheduling bool) error {
return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
},
xgboostv1.Kind: func(mgr manager.Manager) error {
return xgboostcontroller.NewReconciler(mgr).SetupWithManager(mgr)
xgboostv1.Kind: func(mgr manager.Manager, enableGangScheduling bool) error {
return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
},
}

Expand Down
31 changes: 18 additions & 13 deletions pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"k8s.io/client-go/informers"

"github.com/go-logr/logr"
commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
"github.com/kubeflow/common/pkg/controller.v1/common"
Expand Down Expand Up @@ -93,7 +95,7 @@ const (
gangSchedulingPodGroupAnnotation = "scheduling.k8s.io/group-name"
)

func NewReconciler(mgr manager.Manager) *TFJobReconciler {
func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *TFJobReconciler {
r := &TFJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -104,17 +106,21 @@ func NewReconciler(mgr manager.Manager) *TFJobReconciler {
cfg := mgr.GetConfig()
kubeClientSet := kubeclientset.NewForConfigOrDie(cfg)
volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg)
sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses()

r.JobController = common.JobController{
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: false},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder},
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PriorityClassLister: priorityClassInformer.Lister(),
PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder},
}

return r
Expand Down Expand Up @@ -758,7 +764,6 @@ func (r *TFJobReconciler) ReconcilePods(
return nil
}

// TODO (Jeffwan@): it touches too many low level objects like expectations etc
// createNewPod creates a new pod for the given index and type.
func (r *TFJobReconciler) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec *commonv1.ReplicaSpec, masterRole bool,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) error {
Expand Down Expand Up @@ -814,9 +819,9 @@ func (r *TFJobReconciler) createNewPod(tfjob *tfv1.TFJob, rt, index string, spec

// if gang-scheduling is enabled:
// 1. if user has specified other scheduler, we report a warning without overriding any fields.
// 2. if no SchedulerName is set for pods, then we set the SchedulerName to "kube-batch".
// 2. if no SchedulerName is set for pods, then we set the SchedulerName to "volcano".
if r.Config.EnableGangScheduling {
if util.IsGangSchedulerSet(replicas, gangSchedulerName) {
if !util.IsGangSchedulerSet(replicas, gangSchedulerName) {
errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten"
logger.Warning(errMsg)
r.Recorder.Event(tfjob, v1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg)
Expand Down
26 changes: 16 additions & 10 deletions pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/informers"

"github.com/kubeflow/tf-operator/pkg/apis/xgboost/validation"

"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -85,7 +87,7 @@ var (
)

// NewReconciler creates a XGBoostJob Reconciler
func NewReconciler(mgr manager.Manager) *XGBoostJobReconciler {
func NewReconciler(mgr manager.Manager, scheduling bool) *XGBoostJobReconciler {
r := &XGBoostJobReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand All @@ -97,18 +99,22 @@ func NewReconciler(mgr manager.Manager) *XGBoostJobReconciler {
cfg := mgr.GetConfig()
kubeClientSet := kubeclientset.NewForConfigOrDie(cfg)
volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg)
sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)
priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses()

// Initialize common job controller
r.JobController = common.JobController{
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: false},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder},
Controller: r,
Expectations: expectation.NewControllerExpectations(),
Config: common.JobControllerConfiguration{EnableGangScheduling: false},
WorkQueue: &util.FakeWorkQueue{},
Recorder: r.recorder,
KubeClientSet: kubeClientSet,
VolcanoClientSet: volcanoClientSet,
PriorityClassLister: priorityClassInformer.Lister(),
PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced,
PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder},
ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder},
}

return r
Expand Down