Skip to content

Commit

Permalink
*: filter informers when preconditions are met
Browse files Browse the repository at this point in the history
When we can detect at startup time that all of the objects we're about
to look at have the labels we're expecting, we can filter our informer
factories upfront.

Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov committed Aug 28, 2023
1 parent 8ccd442 commit 96cca36
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 51 deletions.
77 changes: 34 additions & 43 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
Expand Down Expand Up @@ -187,6 +186,11 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

canFilter, err := labeller.Validate(ctx, logger, metadataClient)
if err != nil {
return nil, err
}

// Allocate the new instance of an Operator.
op := &Operator{
Operator: queueOperator,
Expand Down Expand Up @@ -363,7 +367,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// Wire k8s sharedIndexInformers
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod())
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), resyncPeriod(), func() []informers.SharedInformerOption {
if !canFilter {
return nil
}
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})}
}()...)
sharedIndexInformers := []cache.SharedIndexInformer{}

// Wire Roles
Expand Down Expand Up @@ -392,8 +403,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolesgvk := rbacv1.SchemeGroupVersion.WithResource("roles")
if err := labelObjects(rolesgvk, roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.Filter(rolesgvk),
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
Expand All @@ -407,8 +419,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rolebindingsgvk := rbacv1.SchemeGroupVersion.WithResource("rolebindings")
if err := labelObjects(rolebindingsgvk, roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(rolebindingsgvk),
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
Expand All @@ -422,10 +435,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
},
serviceaccountsgvk := corev1.SchemeGroupVersion.WithResource("serviceaccounts")
if err := labelObjects(serviceaccountsgvk, serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, labeller.Filter(serviceaccountsgvk),
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
Expand All @@ -440,8 +452,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())

// TODO(skuznets): some services don't seem to have any marker to key off of, but they match the operator name
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
servicesgvk := corev1.SchemeGroupVersion.WithResource("services")
if err := labelObjects(servicesgvk, serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.Filter(servicesgvk),
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
Expand All @@ -464,15 +477,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
if labels := object.GetLabels(); labels != nil {
if _, ok := labels[reconciler.CatalogSourceLabelKey]; ok {
return true
}
}
return false
},
podsgvk := corev1.SchemeGroupVersion.WithResource("pods")
if err := labelObjects(podsgvk, csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, labeller.Filter(podsgvk),
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -505,19 +512,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())

if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
job, ok := object.(*batchv1.Job)
if !ok {
return false
}
for _, container := range job.Spec.Template.Spec.Containers {
if strings.Join(container.Command[0:3], " ") == "opm alpha bundle extract" {
return true
}
}
return false
},
jobsgvk := batchv1.SchemeGroupVersion.WithResource("jobs")
if err := labelObjects(jobsgvk, jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, labeller.Filter(jobsgvk),
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -590,15 +587,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler[*apiextensionsv1.CustomResourceDefinition](
ctx, op.logger, func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
customresourcedefinitionsgvk := apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")
if err := labelObjects(customresourcedefinitionsgvk, crdInformer, labeller.ObjectPatchLabeler[*apiextensionsv1.CustomResourceDefinition](
ctx, op.logger, labeller.Filter(customresourcedefinitionsgvk),
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
Expand Down
106 changes: 106 additions & 0 deletions pkg/controller/operators/labeller/filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package labeller

import (
"context"
"fmt"
"strings"
"sync"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/reconciler"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/metadata"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
)

func Filter(gvr schema.GroupVersionResource) func(metav1.Object) bool {
if f, ok := filters[gvr]; ok {
return f
}
return func(object metav1.Object) bool {
return false
}
}

var filters = map[schema.GroupVersionResource]func(metav1.Object) bool{
corev1.SchemeGroupVersion.WithResource("services"): HasOLMOwnerRef,
corev1.SchemeGroupVersion.WithResource("pods"): func(object metav1.Object) bool {
if labels := object.GetLabels(); labels != nil {
if _, ok := labels[reconciler.CatalogSourceLabelKey]; ok {
return true
}
}
return false
},
corev1.SchemeGroupVersion.WithResource("serviceaccounts"): func(object metav1.Object) bool {
return HasOLMOwnerRef(object) || HasOLMLabel(object)
},
batchv1.SchemeGroupVersion.WithResource("jobs"): func(object metav1.Object) bool {
job, ok := object.(*batchv1.Job)
if !ok {
return false
}
for _, container := range job.Spec.Template.Spec.Containers {
if strings.Join(container.Command[0:3], " ") == "opm alpha bundle extract" {
return true
}
}
return false
},
appsv1.SchemeGroupVersion.WithResource("deployments"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("roles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("rolebindings"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): HasOLMOwnerRef,
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindingss"): HasOLMOwnerRef,
apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"): func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
}

func Validate(ctx context.Context, logger *logrus.Logger, metadataClient metadata.Interface) (bool, error) {
okLock := sync.Mutex{}
var ok bool
g, ctx := errgroup.WithContext(ctx)
for gvr, filter := range filters {
gvr, filter := gvr, filter
g.Go(func() error {
list, err := metadataClient.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list %s: %w", gvr.String(), err)
}
var count int
for _, item := range list.Items {
if filter(&item) && !hasLabel(&item) {
count++
}
}
if count > 0 {
logger.WithFields(logrus.Fields{
"gvr": gvr.String(),
"nonconforming": count,
}).Info("found nonconforming items")
}
okLock.Lock()
ok = ok && count == 0
okLock.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return false, err
}
return ok, nil
}
41 changes: 33 additions & 8 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

canFilter, err := labeller.Validate(ctx, config.logger, config.metadataClient)
if err != nil {
return nil, err
}

op := &Operator{
Operator: queueOperator,
clock: config.clock,
Expand Down Expand Up @@ -315,7 +320,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

// Wire Deployments
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace))
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption {
opts := []informers.SharedInformerOption{
informers.WithNamespace(namespace),
}
if canFilter {
opts = append(opts, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
}))
}
return opts
}()...)
depInformer := k8sInformerFactory.Apps().V1().Deployments()
informersByNamespace[namespace].DeploymentInformer = depInformer
op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister())
Expand Down Expand Up @@ -455,8 +470,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil
}

if err := labelObjects(appsv1.SchemeGroupVersion.WithResource("deployments"), informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
deploymentsgvk := appsv1.SchemeGroupVersion.WithResource("deployments")
if err := labelObjects(deploymentsgvk, informersByNamespace[metav1.NamespaceAll].DeploymentInformer.Informer(), labeller.ObjectLabeler[*appsv1.Deployment, *appsv1applyconfigurations.DeploymentApplyConfiguration](
ctx, op.logger, labeller.Filter(deploymentsgvk),
appsv1applyconfigurations.Deployment,
func(namespace string, ctx context.Context, cfg *appsv1applyconfigurations.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (*appsv1.Deployment, error) {
return op.opClient.KubernetesInterface().AppsV1().Deployments(namespace).Apply(ctx, cfg, opts)
Expand Down Expand Up @@ -502,7 +518,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), func() []informers.SharedInformerOption {
if !canFilter {
return nil
}
return []informers.SharedInformerOption{informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(labels.Set{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})}
}()...)
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
informersByNamespace[metav1.NamespaceAll].ClusterRoleInformer = clusterRoleInformer
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
Expand All @@ -519,8 +542,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterroles"), clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
clusterrolesgvk := rbacv1.SchemeGroupVersion.WithResource("clusterroles")
if err := labelObjects(clusterrolesgvk, clusterRoleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRole, *rbacv1applyconfigurations.ClusterRoleApplyConfiguration](
ctx, op.logger, labeller.Filter(clusterrolesgvk),
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleApplyConfiguration {
return rbacv1applyconfigurations.ClusterRole(name)
},
Expand All @@ -547,8 +571,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindingss"), clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
clusterrolebindingssgvk := rbacv1.SchemeGroupVersion.WithResource("clusterrolebindingss")
if err := labelObjects(clusterrolebindingssgvk, clusterRoleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.ClusterRoleBinding, *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration](
ctx, op.logger, labeller.Filter(clusterrolebindingssgvk),
func(name, _ string) *rbacv1applyconfigurations.ClusterRoleBindingApplyConfiguration {
return rbacv1applyconfigurations.ClusterRoleBinding(name)
},
Expand Down

0 comments on commit 96cca36

Please sign in to comment.