Skip to content

Commit

Permalink
fix #1706 (#1707)
Browse files Browse the repository at this point in the history
  • Loading branch information
HeGaoYuan authored Dec 26, 2022
1 parent 69813fb commit e13d262
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 30 deletions.
4 changes: 3 additions & 1 deletion cmd/training-operator.v1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
var gangSchedulerName string
var namespace string
var monitoringPort int
var controllerThreads int
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
Expand All @@ -74,6 +75,7 @@ func main() {
"If set, it only monitors kubeflow jobs in the given namespace.")
flag.IntVar(&monitoringPort, "monitoring-port", 9443, "Endpoint port for displaying monitoring metrics. "+
"It can be set to \"0\" to disable the metrics serving.")
flag.IntVar(&controllerThreads, "controller-threads", 1, "Number of worker threads used by the controller.")

// PyTorch related flags
flag.StringVar(&config.Config.PyTorchInitContainerImage, "pytorch-init-container-image",
Expand Down Expand Up @@ -120,7 +122,7 @@ func main() {
"scheme not supported", "scheme", s)
os.Exit(1)
}
if err = setupFunc(mgr, enableGangScheduling); err != nil {
if err = setupFunc(mgr, enableGangScheduling, controllerThreads); err != nil {
setupLog.Error(err, "unable to create controller", "controller", s)
os.Exit(1)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v1/mpi/mpijob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,10 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

// SetupWithManager sets up the controller with the Manager.
func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
c, err := controller.New(jc.ControllerName(), mgr, controller.Options{
Reconciler: jc,
Reconciler: jc,
MaxConcurrentReconciles: controllerThreads,
})

if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/mpi/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())

reconciler = NewReconciler(mgr, false)
Expect(reconciler.SetupWithManager(mgr)).NotTo(HaveOccurred())
Expect(reconciler.SetupWithManager(mgr, 1)).NotTo(HaveOccurred())

go func() {
defer GinkgoRecover()
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v1/mxnet/mxjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

// SetupWithManager sets up the controller with the Manager.
func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
Reconciler: r,
Reconciler: r,
MaxConcurrentReconciles: controllerThreads,
})

if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// SetupWithManager sets up the controller with the Manager.
func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
Reconciler: r,
Reconciler: r,
MaxConcurrentReconciles: controllerThreads,
})

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var _ = BeforeSuite(func() {

r := NewReconciler(mgr, false)

Expect(r.SetupWithManager(mgr)).NotTo(gomega.HaveOccurred())
Expect(r.SetupWithManager(mgr, 1)).NotTo(gomega.HaveOccurred())

go func() {
defer GinkgoRecover()
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,10 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// SetupWithManager sets up the controller with the Manager.
func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
Reconciler: r,
Reconciler: r,
MaxConcurrentReconciles: controllerThreads,
})

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var _ = BeforeSuite(func() {

r := NewReconciler(mgr, false)

Expect(r.SetupWithManager(mgr)).NotTo(gomega.HaveOccurred())
Expect(r.SetupWithManager(mgr, 1)).NotTo(gomega.HaveOccurred())

go func() {
defer GinkgoRecover()
Expand Down
26 changes: 13 additions & 13 deletions pkg/controller.v1/register_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,26 @@ import (

const ErrTemplateSchemeNotSupported = "scheme %s is not supported yet"

type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool) error
type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool, controllerThreads int) error

var SupportedSchemeReconciler = map[string]ReconcilerSetupFunc{
kubeflowv1.TFJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
kubeflowv1.TFJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
},
kubeflowv1.PytorchJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
kubeflowv1.PytorchJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
},
kubeflowv1.MXJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
kubeflowv1.MXJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
},
kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
},
kubeflowv1.MPIJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
return mpicontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
kubeflowv1.MPIJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
return mpicontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
},
kubeflowv1.PaddleJobKind: func(mgr manager.Manager, enableGangScheduling bool) error {
return paddlecontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr)
kubeflowv1.PaddleJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error {
return paddlecontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads)
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1/tensorflow/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())

reconciler = NewReconciler(mgr, false)
Expect(reconciler.SetupWithManager(mgr)).NotTo(HaveOccurred())
Expect(reconciler.SetupWithManager(mgr, 1)).NotTo(HaveOccurred())

go func() {
defer GinkgoRecover()
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v1/tensorflow/tfjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,10 @@ func (r *TFJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

// SetupWithManager sets up the controller with the Manager.
func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
Reconciler: r,
Reconciler: r,
MaxConcurrentReconciles: controllerThreads,
})

if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller.v1/xgboost/xgboostjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,10 @@ func (r *XGBoostJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// SetupWithManager sets up the controller with the Manager.
func (r *XGBoostJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *XGBoostJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error {
c, err := controller.New(r.ControllerName(), mgr, controller.Options{
Reconciler: r,
Reconciler: r,
MaxConcurrentReconciles: controllerThreads,
})

if err != nil {
Expand Down

0 comments on commit e13d262

Please sign in to comment.