Skip to content

Commit

Permalink
Merge pull request #405 from wangyuqing4/vc-ctrl
Browse files Browse the repository at this point in the history
clean some redundant  job/pg/gc controllers code
  • Loading branch information
volcano-sh-bot authored Aug 3, 2019
2 parents 1a856a1 + e4e871e commit 8dd5a34
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 68 deletions.
42 changes: 23 additions & 19 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -38,8 +39,6 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

kbver "volcano.sh/volcano/pkg/client/clientset/versioned"

"volcano.sh/volcano/cmd/controllers/app/options"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
Expand Down Expand Up @@ -81,23 +80,7 @@ func Run(opt *options.ServerOption) error {
return err
}

// TODO: add user agent for different controllers
kubeClient := clientset.NewForConfigOrDie(config)
kbClient := kbver.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

jobController := job.NewJobController(kubeClient, kbClient, vkClient, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, kbClient)
garbageCollector := garbagecollector.New(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, kbClient, opt.SchedulerName)

run := func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
go pgController.Run(ctx.Done())
<-ctx.Done()
}
run := startControllers(config, opt)

if !opt.EnableLeaderElection {
run(context.TODO())
Expand Down Expand Up @@ -147,3 +130,24 @@ func Run(opt *options.ServerOption) error {
})
return fmt.Errorf("lost lease")
}

func startControllers(config *rest.Config, opt *options.ServerOption) func(ctx context.Context) {
// TODO: add user agent for different controllers
kubeClient := clientset.NewForConfigOrDie(config)
vkClient := vkclient.NewForConfigOrDie(config)

sharedInformers := informers.NewSharedInformerFactory(kubeClient, 0)

jobController := job.NewJobController(kubeClient, vkClient, sharedInformers, opt.WorkerThreads)
queueController := queue.NewQueueController(kubeClient, vkClient)
garbageCollector := garbagecollector.NewGarbageCollector(vkClient)
pgController := podgroup.NewPodgroupController(kubeClient, vkClient, sharedInformers, opt.SchedulerName)

return func(ctx context.Context) {
go jobController.Run(ctx.Done())
go queueController.Run(ctx.Done())
go garbageCollector.Run(ctx.Done())
go pgController.Run(ctx.Done())
<-ctx.Done()
}
}
4 changes: 2 additions & 2 deletions pkg/controllers/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type GarbageCollector struct {
queue workqueue.RateLimitingInterface
}

// New creates an instance of GarbageCollector
func New(vkClient vkver.Interface) *GarbageCollector {
// NewGarbageCollector creates an instance of GarbageCollector
func NewGarbageCollector(vkClient vkver.Interface) *GarbageCollector {
jobInformer := vkinfoext.NewSharedInformerFactory(vkClient, 0).Batch().V1alpha1().Jobs()

gb := &GarbageCollector{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestGarbageCollector_ProcessTTL(t *testing.T) {
},
}
for i, testcase := range testcases {
gc := New(volcanoclient.NewSimpleClientset())
gc := NewGarbageCollector(volcanoclient.NewSimpleClientset())

expired, err := gc.processTTL(testcase.Job)
if err != nil {
Expand Down
32 changes: 13 additions & 19 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

kbver "volcano.sh/volcano/pkg/client/clientset/versioned"
kbinfoext "volcano.sh/volcano/pkg/client/informers/externalversions"
kbinfo "volcano.sh/volcano/pkg/client/informers/externalversions/scheduling/v1alpha2"
kblister "volcano.sh/volcano/pkg/client/listers/scheduling/v1alpha2"
Expand All @@ -61,16 +60,14 @@ import (
type Controller struct {
kubeClients kubernetes.Interface
vkClients vkver.Interface
kbClients kbver.Interface

jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer kbinfo.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer vkcoreinfo.CommandInformer
pcInformer schedv1.PriorityClassInformer
sharedInformers informers.SharedInformerFactory
jobInformer vkbatchinfo.JobInformer
podInformer coreinformers.PodInformer
pvcInformer coreinformers.PersistentVolumeClaimInformer
pgInformer kbinfo.PodGroupInformer
svcInformer coreinformers.ServiceInformer
cmdInformer vkcoreinfo.CommandInformer
pcInformer schedv1.PriorityClassInformer

// A store of jobs
jobLister vkbatchlister.JobLister
Expand Down Expand Up @@ -113,8 +110,8 @@ type Controller struct {
// NewJobController create new Job Controller
func NewJobController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
vkClient vkver.Interface,
sharedInformers informers.SharedInformerFactory,
workers uint32,
) *Controller {

Expand All @@ -127,7 +124,6 @@ func NewJobController(
cc := &Controller{
kubeClients: kubeClient,
vkClients: vkClient,
kbClients: kbClient,
queueList: make([]workqueue.RateLimitingInterface, workers, workers),
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
cache: jobcache.New(),
Expand Down Expand Up @@ -158,8 +154,7 @@ func NewJobController(
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
cc.podInformer = cc.sharedInformers.Core().V1().Pods()
cc.podInformer = sharedInformers.Core().V1().Pods()
cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
Expand All @@ -169,22 +164,22 @@ func NewJobController(
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced

cc.pvcInformer = cc.sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcInformer = sharedInformers.Core().V1().PersistentVolumeClaims()
cc.pvcLister = cc.pvcInformer.Lister()
cc.pvcSynced = cc.pvcInformer.Informer().HasSynced

cc.svcInformer = cc.sharedInformers.Core().V1().Services()
cc.svcInformer = sharedInformers.Core().V1().Services()
cc.svcLister = cc.svcInformer.Lister()
cc.svcSynced = cc.svcInformer.Informer().HasSynced

cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.kbClients, 0).Scheduling().V1alpha2().PodGroups()
cc.pgInformer = kbinfoext.NewSharedInformerFactory(cc.vkClients, 0).Scheduling().V1alpha2().PodGroups()
cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: cc.updatePodGroup,
})
cc.pgLister = cc.pgInformer.Lister()
cc.pgSynced = cc.pgInformer.Informer().HasSynced

cc.pcInformer = cc.sharedInformers.Scheduling().V1beta1().PriorityClasses()
cc.pcInformer = sharedInformers.Scheduling().V1beta1().PriorityClasses()
cc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPriorityClass,
DeleteFunc: cc.deletePriorityClass,
Expand All @@ -203,7 +198,6 @@ func NewJobController(
// Run start JobController
func (cc *Controller) Run(stopCh <-chan struct{}) {

go cc.sharedInformers.Start(stopCh)
go cc.jobInformer.Informer().Run(stopCh)
go cc.podInformer.Informer().Run(stopCh)
go cc.pvcInformer.Informer().Run(stopCh)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
}

// Delete PodGroup
if err := cc.kbClients.SchedulingV1alpha2().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil {
if err := cc.vkClients.SchedulingV1alpha2().PodGroups(job.Namespace).Delete(job.Name, nil); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete PodGroup of Job %v/%v: %v",
job.Namespace, job.Name, err)
Expand Down Expand Up @@ -468,7 +468,7 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error {
},
}

if _, err = cc.kbClients.SchedulingV1alpha2().PodGroups(job.Namespace).Create(pg); err != nil {
if _, err = cc.vkClients.SchedulingV1alpha2().PodGroups(job.Namespace).Create(pg); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(3).Infof("Failed to create PodGroup for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestCreatePodGroupIfNotExistFunc(t *testing.T) {
t.Errorf("Expected return value to be equal to expected: %s, but got: %s", testcase.ExpextVal, err)
}

_, err = fakeController.kbClients.SchedulingV1alpha2().PodGroups(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
_, err = fakeController.vkClients.SchedulingV1alpha2().PodGroups(namespace).Get(testcase.Job.Name, metav1.GetOptions{})
if err != nil {
t.Error("Expected PodGroup to get created, but not created")
}
Expand Down
27 changes: 10 additions & 17 deletions pkg/controllers/job/job_controller_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ package job

import (
"fmt"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"testing"
"volcano.sh/volcano/pkg/apis/helpers"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
vkbusv1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
kbv1 "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned"
vkclientset "volcano.sh/volcano/pkg/client/clientset/versioned"
//"volcano.sh/volcano/pkg/controllers/job"
)

func newController() *Controller {
Expand All @@ -45,23 +45,16 @@ func newController() *Controller {
},
)

kubeBatchClientSet := kubebatchclient.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &kbv1.SchemeGroupVersion,
},
},
)

config := &rest.Config{
vkclient := vkclientset.NewForConfigOrDie(&rest.Config{
Host: "",
ContentConfig: rest.ContentConfig{
GroupVersion: &vkv1.SchemeGroupVersion,
},
}
})

sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0)

vkclient := vkclientset.NewForConfigOrDie(config)
controller := NewJobController(kubeClientSet, kubeBatchClientSet, vkclient, 3)
controller := NewJobController(kubeClientSet, vkclient, sharedInformers, 3)

return controller
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/job/job_controller_plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
kubeclient "k8s.io/client-go/kubernetes/fake"

vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
volcanoclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake"

kubebatchclient "volcano.sh/volcano/pkg/client/clientset/versioned/fake"
)

func newFakeController() *Controller {
KubeBatchClientSet := kubebatchclient.NewSimpleClientset()
VolcanoClientSet := volcanoclient.NewSimpleClientset()
KubeClientSet := kubeclient.NewSimpleClientset()

controller := NewJobController(KubeClientSet, KubeBatchClientSet, VolcanoClientSet, 3)
sharedInformers := informers.NewSharedInformerFactory(KubeClientSet, 0)

controller := NewJobController(KubeClientSet, VolcanoClientSet, sharedInformers, 3)
return controller
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/podgroup/pg_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Controller struct {
func NewPodgroupController(
kubeClient kubernetes.Interface,
kbClient kbver.Interface,
sharedInformers informers.SharedInformerFactory,
schedulerName string,
) *Controller {
cc := &Controller{
Expand All @@ -67,8 +68,7 @@ func NewPodgroupController(
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}

factory := informers.NewSharedInformerFactory(cc.kubeClients, 0)
cc.podInformer = factory.Core().V1().Pods()
cc.podInformer = sharedInformers.Core().V1().Pods()
cc.podLister = cc.podInformer.Lister()
cc.podSynced = cc.podInformer.Informer().HasSynced
cc.podInformer.Informer().AddEventHandler(
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/podgroup/pg_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
kubeclient "k8s.io/client-go/kubernetes/fake"

scheduling "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2"
Expand All @@ -31,8 +32,9 @@ import (
func newFakeController() *Controller {
KubeClientSet := kubeclient.NewSimpleClientset()
KubeBatchClientSet := kubebatchclient.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(KubeClientSet, 0)

controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, "volcano")
controller := NewPodgroupController(KubeClientSet, KubeBatchClientSet, sharedInformers, "volcano")
return controller
}

Expand Down

0 comments on commit 8dd5a34

Please sign in to comment.