Skip to content

Commit

Permalink
*: label k8s objects we own
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kuznetsov <skuznets@redhat.com>
  • Loading branch information
stevekuznetsov committed Aug 28, 2023
1 parent 9e7031f commit 8ccd442
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/coreos/go-semver v0.3.0
github.com/davecgh/go-spew v1.1.1
github.com/distribution/distribution v2.7.1+incompatible
github.com/evanphx/json-patch v5.6.0+incompatible
github.com/fsnotify/fsnotify v1.6.0
github.com/ghodss/yaml v1.0.0
github.com/go-air/gini v1.0.4
Expand Down Expand Up @@ -95,7 +96,6 @@ require (
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/fatih/color v1.13.0 // indirect
Expand Down
124 changes: 124 additions & 0 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import (
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/internal/alongside"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/connectivity"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
Expand All @@ -32,6 +35,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/yaml"
batchv1applyconfigurations "k8s.io/client-go/applyconfigurations/batch/v1"
corev1applyconfigurations "k8s.io/client-go/applyconfigurations/core/v1"
rbacv1applyconfigurations "k8s.io/client-go/applyconfigurations/rbac/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/metadata"
Expand Down Expand Up @@ -103,6 +109,7 @@ type Operator struct {
client versioned.Interface
dynamicClient dynamic.Interface
lister operatorlister.OperatorLister
k8sLabelQueueSets map[schema.GroupVersionResource]workqueue.RateLimitingInterface
catsrcQueueSet *queueinformer.ResourceQueueSet
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
Expand Down Expand Up @@ -191,6 +198,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
lister: lister,
namespace: operatorNamespace,
recorder: eventRecorder,
k8sLabelQueueSets: map[schema.GroupVersionResource]workqueue.RateLimitingInterface{},
catsrcQueueSet: queueinformer.NewEmptyResourceQueueSet(),
subQueueSet: queueinformer.NewEmptyResourceQueueSet(),
ipQueueSet: queueinformer.NewEmptyResourceQueueSet(),
Expand Down Expand Up @@ -363,21 +371,85 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.RbacV1().RegisterRoleLister(metav1.NamespaceAll, roleInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleInformer.Informer())

labelObjects := func(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer, sync func(obj interface{}) error) error {
op.k8sLabelQueueSets[gvr] = workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: gvr.String(),
})
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(informer),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(sync).ToSyncer()),
)
if err != nil {
return err
}

if err := op.RegisterQueueInformer(queueInformer); err != nil {
return err
}

return nil
}

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("roles"), roleInformer.Informer(), labeller.ObjectLabeler[*rbacv1.Role, *rbacv1applyconfigurations.RoleApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rbacv1applyconfigurations.Role,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.Role, error) {
return op.opClient.KubernetesInterface().RbacV1().Roles(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire RoleBindings
roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings()
op.lister.RbacV1().RegisterRoleBindingLister(metav1.NamespaceAll, roleBindingInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, roleBindingInformer.Informer())

if err := labelObjects(rbacv1.SchemeGroupVersion.WithResource("rolebindings"), roleBindingInformer.Informer(), labeller.ObjectLabeler[*rbacv1.RoleBinding, *rbacv1applyconfigurations.RoleBindingApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
rbacv1applyconfigurations.RoleBinding,
func(namespace string, ctx context.Context, cfg *rbacv1applyconfigurations.RoleBindingApplyConfiguration, opts metav1.ApplyOptions) (*rbacv1.RoleBinding, error) {
return op.opClient.KubernetesInterface().RbacV1().RoleBindings(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire ServiceAccounts
serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts()
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceAccountInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("serviceaccounts"), serviceAccountInformer.Informer(), labeller.ObjectLabeler[*corev1.ServiceAccount, *corev1applyconfigurations.ServiceAccountApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
return labeller.HasOLMOwnerRef(object) || labeller.HasOLMLabel(object)
},
corev1applyconfigurations.ServiceAccount,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceAccountApplyConfiguration, opts metav1.ApplyOptions) (*corev1.ServiceAccount, error) {
return op.opClient.KubernetesInterface().CoreV1().ServiceAccounts(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire Services
serviceInformer := k8sInformerFactory.Core().V1().Services()
op.lister.CoreV1().RegisterServiceLister(metav1.NamespaceAll, serviceInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, serviceInformer.Informer())

// TODO(skuznets): some services don't seem to have any marker to key off of, but they match the operator name
if err := labelObjects(corev1.SchemeGroupVersion.WithResource("services"), serviceInformer.Informer(), labeller.ObjectLabeler[*corev1.Service, *corev1applyconfigurations.ServiceApplyConfiguration](
ctx, op.logger, labeller.HasOLMOwnerRef,
corev1applyconfigurations.Service,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.ServiceApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Service, error) {
return op.opClient.KubernetesInterface().CoreV1().Services(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire Pods for CatalogSource
catsrcReq, err := labels.NewRequirement(reconciler.CatalogSourceLabelKey, selection.Exists, nil)
if err != nil {
Expand All @@ -392,6 +464,23 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
op.lister.CoreV1().RegisterPodLister(metav1.NamespaceAll, csPodInformer.Lister())
sharedIndexInformers = append(sharedIndexInformers, csPodInformer.Informer())

if err := labelObjects(corev1.SchemeGroupVersion.WithResource("pods"), csPodInformer.Informer(), labeller.ObjectLabeler[*corev1.Pod, *corev1applyconfigurations.PodApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
if labels := object.GetLabels(); labels != nil {
if _, ok := labels[reconciler.CatalogSourceLabelKey]; ok {
return true
}
}
return false
},
corev1applyconfigurations.Pod,
func(namespace string, ctx context.Context, cfg *corev1applyconfigurations.PodApplyConfiguration, opts metav1.ApplyOptions) (*corev1.Pod, error) {
return op.opClient.KubernetesInterface().CoreV1().Pods(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Wire Pods for BundleUnpack job
buReq, err := labels.NewRequirement(bundle.BundleUnpackPodLabel, selection.Exists, nil)
if err != nil {
Expand All @@ -416,6 +505,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
jobInformer := k8sInformerFactory.Batch().V1().Jobs()
sharedIndexInformers = append(sharedIndexInformers, jobInformer.Informer())

if err := labelObjects(batchv1.SchemeGroupVersion.WithResource("jobs"), jobInformer.Informer(), labeller.ObjectLabeler[*batchv1.Job, *batchv1applyconfigurations.JobApplyConfiguration](
ctx, op.logger, func(object metav1.Object) bool {
job, ok := object.(*batchv1.Job)
if !ok {
return false
}
for _, container := range job.Spec.Template.Spec.Containers {
if strings.Join(container.Command[0:3], " ") == "opm alpha bundle extract" {
return true
}
}
return false
},
batchv1applyconfigurations.Job,
func(namespace string, ctx context.Context, cfg *batchv1applyconfigurations.JobApplyConfiguration, opts metav1.ApplyOptions) (*batchv1.Job, error) {
return op.opClient.KubernetesInterface().BatchV1().Jobs(namespace).Apply(ctx, cfg, opts)
},
)); err != nil {
return nil, err
}

// Generate and register QueueInformers for k8s resources
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
for _, informer := range sharedIndexInformers {
Expand Down Expand Up @@ -480,6 +590,20 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
return nil, err
}

if err := labelObjects(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"), crdInformer, labeller.ObjectPatchLabeler[*apiextensionsv1.CustomResourceDefinition](
ctx, op.logger, func(object metav1.Object) bool {
for key := range object.GetAnnotations() {
if strings.HasPrefix(key, alongside.AnnotationPrefix) {
return true
}
}
return false
},
op.opClient.ApiextensionsInterface().ApiextensionsV1().CustomResourceDefinitions().Patch,
)); err != nil {
return nil, err
}

// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/operators/internal/alongside/alongside.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

const (
prefix = "operatorframework.io/installed-alongside-"
AnnotationPrefix = "operatorframework.io/installed-alongside-"
)

// NamespacedName is a reference to an object by namespace and name.
Expand All @@ -33,7 +33,7 @@ type Annotator struct{}
func (a Annotator) FromObject(o Annotatable) []NamespacedName {
var result []NamespacedName
for k, v := range o.GetAnnotations() {
if !strings.HasPrefix(k, prefix) {
if !strings.HasPrefix(k, AnnotationPrefix) {
continue
}
tokens := strings.Split(v, "/")
Expand All @@ -55,7 +55,7 @@ func (a Annotator) ToObject(o Annotatable, nns []NamespacedName) {
annotations := o.GetAnnotations()

for key := range annotations {
if strings.HasPrefix(key, prefix) {
if strings.HasPrefix(key, AnnotationPrefix) {
delete(annotations, key)
}
}
Expand All @@ -82,5 +82,5 @@ func key(n NamespacedName) string {
hasher.Write([]byte(n.Namespace))
hasher.Write([]byte{'/'})
hasher.Write([]byte(n.Name))
return fmt.Sprintf("%s%x", prefix, hasher.Sum64())
return fmt.Sprintf("%s%x", AnnotationPrefix, hasher.Sum64())
}
4 changes: 2 additions & 2 deletions pkg/controller/operators/internal/alongside/alongside_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestAnnotatorFromObject(t *testing.T) {
NamespacedNames []NamespacedName
}{
{
Name: "annotation without prefix ignored",
Name: "annotation without AnnotationPrefix ignored",
Object: TestAnnotatable{
"foo": "namespace/name",
},
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestAnnotatorToObject(t *testing.T) {
},
},
{
Name: "annotation without prefix ignored",
Name: "annotation without AnnotationPrefix ignored",
Object: TestAnnotatable{
"operatorframework.io/something-else": "",
},
Expand Down
Loading

0 comments on commit 8ccd442

Please sign in to comment.