Skip to content

Commit

Permalink
feat:add k8s job to watch and remove unused client for minioJob (mini…
Browse files Browse the repository at this point in the history
…o#2027)

* add k8s job to watch

* update go mod
  • Loading branch information
jiuker authored Mar 11, 2024
1 parent baccdf6 commit 798825d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 33 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ require (
github.com/lestrrat-go/blackmagic v1.0.2 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/jwx v1.2.28 // indirect
github.com/lestrrat-go/jwx v1.2.29 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect
Expand Down
10 changes: 7 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZ
github.com/lestrrat-go/httpcc v1.0.1/go.mod h1:qiltp3Mt56+55GPVCbTdM9MlqhvzyuL6W/NMDA8vA5E=
github.com/lestrrat-go/iter v1.0.2 h1:gMXo1q4c2pHmC3dn8LzRhJfP1ceCbgSiT9lUydIzltI=
github.com/lestrrat-go/iter v1.0.2/go.mod h1:Momfcq3AnRlRjI5b5O8/G5/BvpzrhoFTZcn06fEOPt4=
github.com/lestrrat-go/jwx v1.2.28 h1:uadI6o0WpOVrBSf498tRXZIwPpEtLnR9CvqPFXeI5sA=
github.com/lestrrat-go/jwx v1.2.28/go.mod h1:nF+91HEMh/MYFVwKPl5HHsBGMPscqbQb+8IDQdIazP8=
github.com/lestrrat-go/jwx v1.2.29 h1:QT0utmUJ4/12rmsVQrJ3u55bycPkKqGYuGT4tyRhxSQ=
github.com/lestrrat-go/jwx v1.2.29/go.mod h1:hU8k2l6WF0ncx20uQdOmik/Gjg6E3/wIRtXSNFeZuB8=
github.com/lestrrat-go/option v1.0.0/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU=
github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I=
Expand Down Expand Up @@ -351,6 +351,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down Expand Up @@ -415,7 +416,7 @@ golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWP
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211209193657-4570a0811e8b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e h1:+WEEuIdZHnUeJJmEUjyYC2gfUMj69yZXw17EnHg/otA=
Expand All @@ -435,6 +436,7 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ=
Expand Down Expand Up @@ -474,6 +476,7 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand All @@ -482,6 +485,7 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func StartOperator(kubeconfig string) {
hostsTemplate,
pkg.Version,
minioInformerFactory.Job().V1alpha1().MinIOJobs(),
kubeInformerFactory.Batch().V1().Jobs(),
)

go kubeInformerFactory.Start(stopCh)
Expand Down
49 changes: 27 additions & 22 deletions pkg/controller/job-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ import (
"github.com/minio/operator/pkg/apis/job.min.io/v1alpha1"
miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2"
stsv1alpha1 "github.com/minio/operator/pkg/apis/sts.min.io/v1alpha1"
clientset "github.com/minio/operator/pkg/client/clientset/versioned"
jobinformers "github.com/minio/operator/pkg/client/informers/externalversions/job.min.io/v1alpha1"
joblisters "github.com/minio/operator/pkg/client/listers/job.min.io/v1alpha1"
batchjobv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
batchv1 "k8s.io/client-go/informers/batch/v1"
"k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1"
k8sjoblisters "k8s.io/client-go/listers/batch/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand All @@ -31,13 +32,12 @@ import (
// JobController struct watches the Kubernetes API for changes to Tenant resources
type JobController struct {
namespacesToWatch set.StringSet
lister joblisters.MinIOJobLister
hasSynced cache.InformerSynced
kubeClientSet kubernetes.Interface
statefulSetLister appslisters.StatefulSetLister
minioJobLister joblisters.MinIOJobLister
minioJobHasSynced cache.InformerSynced
jobLister k8sjoblisters.JobLister
jobHasSynced cache.InformerSynced
recorder record.EventRecorder
workqueue workqueue.RateLimitingInterface
minioClientSet clientset.Interface
k8sClient client.Client
}

Expand Down Expand Up @@ -73,49 +73,54 @@ func (c *JobController) enqueueJob(obj interface{}) {

// NewJobController returns a new Operator Controller
func NewJobController(
jobinformer jobinformers.MinIOJobInformer,
minioJobInformer jobinformers.MinIOJobInformer,
jobInformer batchv1.JobInformer,
namespacesToWatch set.StringSet,
joblister joblisters.MinIOJobLister,
hasSynced cache.InformerSynced,
kubeClientSet kubernetes.Interface,
statefulSetLister appslisters.StatefulSetLister,
recorder record.EventRecorder,
workqueue workqueue.RateLimitingInterface,
minioClientSet clientset.Interface,
k8sClient client.Client,
) *JobController {
controller := &JobController{
namespacesToWatch: namespacesToWatch,
lister: joblister,
hasSynced: hasSynced,
kubeClientSet: kubeClientSet,
statefulSetLister: statefulSetLister,
minioJobLister: minioJobInformer.Lister(),
minioJobHasSynced: minioJobInformer.Informer().HasSynced,
jobLister: jobInformer.Lister(),
jobHasSynced: jobInformer.Informer().HasSynced,
recorder: recorder,
workqueue: workqueue,
minioClientSet: minioClientSet,
k8sClient: k8sClient,
}

// Set up an event handler for when resources change
jobinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
minioJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueJob,
UpdateFunc: func(old, new interface{}) {
oldJob := old.(*v1alpha1.MinIOJob)
newJob := new.(*v1alpha1.MinIOJob)
if oldJob.ResourceVersion == newJob.ResourceVersion {
// Periodic resync will send update events for all known Tenants.
// Two different versions of the same Tenant will always have different RVs.
return
}
controller.enqueueJob(new)
},
})

jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
oldJob := old.(*batchjobv1.Job)
newJob := new.(*batchjobv1.Job)
if oldJob.ResourceVersion == newJob.ResourceVersion {
return
}
// todo record the job status.
},
})
return controller
}

// HasSynced is to determine if obj is synced
func (c *JobController) HasSynced() cache.InformerSynced {
return c.hasSynced
return c.minioJobHasSynced
}

// HandleObject will take any resource implementing metav1.Object and attempt
Expand All @@ -125,7 +130,7 @@ func (c *JobController) HandleObject(obj metav1.Object) {
if ownerRef := metav1.GetControllerOf(obj); ownerRef != nil {
switch ownerRef.Kind {
case JobCRDResourceKind:
job, err := c.lister.MinIOJobs(obj.GetNamespace()).Get(ownerRef.Name)
job, err := c.minioJobLister.MinIOJobs(obj.GetNamespace()).Get(ownerRef.Name)
if err != nil {
klog.V(4).Info("Ignore orphaned object", "object", klog.KObj(job), JobCRDResourceKind, ownerRef.Name)
return
Expand Down
19 changes: 12 additions & 7 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
batchv1 "k8s.io/client-go/informers/batch/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -237,7 +238,8 @@ func NewController(
serviceInformer coreinformers.ServiceInformer,
hostsTemplate,
operatorVersion string,
jobinformer jobinformers.MinIOJobInformer,
minioJobinformer jobinformers.MinIOJobInformer,
jobInformer batchv1.JobInformer,
) *Controller {
// Create event broadcaster
// Add minio-controller types to the default Kubernetes Scheme so Events can be
Expand Down Expand Up @@ -301,15 +303,12 @@ func NewController(
operatorImage: oprImg,
controllers: []*JobController{
NewJobController(
jobinformer,
minioJobinformer,
jobInformer,
namespacesToWatch,
jobinformer.Lister(),
jobinformer.Informer().HasSynced,
kubeClientSet,
statefulSetInformer.Lister(),
recorder,
queue.NewNamedRateLimitingQueue(MinIOControllerRateLimiter(), "Tenants"),
minioClientSet,
queue.NewNamedRateLimitingQueue(MinIOControllerRateLimiter(), "MinioJobs"),
k8sClient,
),
},
Expand Down Expand Up @@ -451,6 +450,12 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha
if ok := cache.WaitForCacheSync(stopCh, c.statefulSetListerSynced, c.deploymentListerSynced, c.tenantsSynced, c.policyBindingListerSynced); !ok {
panic("failed to wait for caches to sync")
}
// Wait for the caches to be synced before starting workers
for _, jobController := range c.controllers {
if ok := cache.WaitForCacheSync(stopCh, jobController.minioJobHasSynced, jobController.jobHasSynced); !ok {
panic("failed to wait for caches to sync")
}
}

klog.Info("Starting workers and Job workers")
JobController := c.controllers[0]
Expand Down

0 comments on commit 798825d

Please sign in to comment.