Skip to content

Commit

Permalink
Introduce olmConfig controller (#2466)
Browse files Browse the repository at this point in the history
This commit introduces a controller for the olmConfig CRD.
The olmConfig CRD will be used to configure olm's behavior
on cluster. As of today, this CRD introduces the ability
for customer to disable copied csvs for operators installed
in allNamespace mode. When copied csv are disabled, an event
will be created in the operators namespace signaling that
it has no copied csvs and that users on the cluster may
have difficulty identifying which operators are available
in a given namespace.

Signed-off-by: Alexander Greene <greene.al1991@gmail.com>
  • Loading branch information
awgreene committed Dec 14, 2021
1 parent be0c556 commit 52f368d
Show file tree
Hide file tree
Showing 3 changed files with 545 additions and 3 deletions.
4 changes: 4 additions & 0 deletions deploy/chart/templates/0000_50_olm_02-olmconfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: operators.coreos.com/v1
kind: OLMConfig
metadata:
name: cluster
290 changes: 287 additions & 3 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
utilclock "k8s.io/apimachinery/pkg/util/clock"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand Down Expand Up @@ -68,6 +70,7 @@ type Operator struct {
copiedCSVLister operatorsv1alpha1listers.ClusterServiceVersionLister
ogQueueSet *queueinformer.ResourceQueueSet
csvQueueSet *queueinformer.ResourceQueueSet
olmConfigQueue workqueue.RateLimitingInterface
csvCopyQueueSet *queueinformer.ResourceQueueSet
copiedCSVGCQueueSet *queueinformer.ResourceQueueSet
objGCQueueSet *queueinformer.ResourceQueueSet
Expand Down Expand Up @@ -124,6 +127,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
client: config.externalClient,
ogQueueSet: queueinformer.NewEmptyResourceQueueSet(),
csvQueueSet: queueinformer.NewEmptyResourceQueueSet(),
olmConfigQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "olmConfig"),
csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(),
copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
objGCQueueSet: queueinformer.NewEmptyResourceQueueSet(),
Expand Down Expand Up @@ -433,6 +437,26 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
return nil, err
}

// Register QueueInformer for olmConfig
olmConfigInformer := externalversions.NewSharedInformerFactoryWithOptions(
op.client,
config.resyncPeriod(),
).Operators().V1().OLMConfigs().Informer()
olmConfigQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithInformer(olmConfigInformer),
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(op.olmConfigQueue),
queueinformer.WithIndexer(olmConfigInformer.GetIndexer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOLMConfig).ToSyncer()),
)
if err != nil {
return nil, err
}
if err := op.RegisterQueueInformer(olmConfigQueueInformer); err != nil {
return nil, err
}

k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
Expand Down Expand Up @@ -1211,13 +1235,143 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
return
}

func (a *Operator) allNamespaceOperatorGroups() ([]*v1.OperatorGroup, error) {
operatorGroups, err := a.lister.OperatorsV1().OperatorGroupLister().List(labels.Everything())
if err != nil {
return nil, err
}

result := []*v1.OperatorGroup{}
for _, operatorGroup := range operatorGroups {
if NewNamespaceSet(operatorGroup.Status.Namespaces).IsAllNamespaces() {
result = append(result, operatorGroup)
}
}
return result, nil
}

func (a *Operator) syncOLMConfig(obj interface{}) (syncError error) {
a.logger.Info("Processing olmConfig")
olmConfig, ok := obj.(*v1.OLMConfig)
if !ok {
return fmt.Errorf("casting OLMConfig failed")
}

// Generate an array of allNamespace OperatorGroups
allNSOperatorGroups, err := a.allNamespaceOperatorGroups()
if err != nil {
return err
}

nonCopiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.DoesNotExist, []string{})
if err != nil {
return err
}

csvIsRequeued := false
for _, og := range allNSOperatorGroups {
// Get all copied CSVs owned by this operatorGroup
copiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.Equals, []string{og.GetNamespace()})
if err != nil {
return err
}

copiedCSVs, err := a.copiedCSVLister.List(labels.NewSelector().Add(*copiedCSVRequirement))
if err != nil {
return err
}

// Filter to unique copies
uniqueCopiedCSVs := map[string]struct{}{}
for _, copiedCSV := range copiedCSVs {
uniqueCopiedCSVs[copiedCSV.GetName()] = struct{}{}
}

csvs, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(og.GetNamespace()).List(labels.NewSelector().Add(*nonCopiedCSVRequirement))
if err != nil {
return err
}

for _, csv := range csvs {
// If the correct number of copied CSVs were found, continue
if _, ok := uniqueCopiedCSVs[csv.GetName()]; ok == olmConfig.CopiedCSVsAreEnabled() {
continue
}

if err := a.csvQueueSet.Requeue(csv.GetNamespace(), csv.GetName()); err != nil {
a.logger.WithError(err).Warn("unable to requeue")
}
csvIsRequeued = true
}
}

// Update the olmConfig status if it has changed.
condition := getCopiedCSVsCondition(!olmConfig.CopiedCSVsAreEnabled(), csvIsRequeued)
if !isStatusConditionPresentAndAreTypeReasonMessageStatusEqual(olmConfig.Status.Conditions, condition) {
meta.SetStatusCondition(&olmConfig.Status.Conditions, condition)
if _, err := a.client.OperatorsV1().OLMConfigs().UpdateStatus(context.TODO(), olmConfig, metav1.UpdateOptions{}); err != nil {
return err
}
}

return nil
}

func isStatusConditionPresentAndAreTypeReasonMessageStatusEqual(conditions []metav1.Condition, condition metav1.Condition) bool {
foundCondition := meta.FindStatusCondition(conditions, condition.Type)
if foundCondition == nil {
return false
}
return foundCondition.Type == condition.Type &&
foundCondition.Reason == condition.Reason &&
foundCondition.Message == condition.Message &&
foundCondition.Status == condition.Status
}

func getCopiedCSVsCondition(isDisabled, csvIsRequeued bool) metav1.Condition {
condition := metav1.Condition{
Type: v1.DisabledCopiedCSVsConditionType,
LastTransitionTime: metav1.Now(),
Status: metav1.ConditionFalse,
}
if !isDisabled {
condition.Reason = "CopiedCSVsEnabled"
condition.Message = "Copied CSVs are enabled and present accross the cluster"
if csvIsRequeued {
condition.Message = "Copied CSVs are enabled and at least one copied CSVs is missing"
}
return condition
}

if csvIsRequeued {
condition.Reason = "CopiedCSVsFound"
condition.Message = "Copied CSVs are disabled and at least one copied CSV was found for an operator installed in AllNamespace mode"
return condition
}

condition.Status = metav1.ConditionTrue
condition.Reason = "NoCopiedCSVsFound"
condition.Message = "Copied CSVs are disabled and none were found for operators installed in AllNamespace mode"

return condition
}

func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)
if !ok {
a.logger.Debugf("wrong type: %#v", obj)
return fmt.Errorf("casting ClusterServiceVersion failed")
}

olmConfig, err := a.client.OperatorsV1().OLMConfigs().Get(context.TODO(), "cluster", metav1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}

if err == nil {
go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5)
}

logger := a.logger.WithFields(logrus.Fields{
"id": queueinformer.NewLoopID(),
"csv": clusterServiceVersion.GetName(),
Expand All @@ -1239,15 +1393,145 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
"targetNamespaces": strings.Join(operatorGroup.Status.Namespaces, ","),
}).Debug("copying csv to targets")

copiedCSVsAreEnabled, err := a.copiedCSVsAreEnabled()
if err != nil {
return err
}

// Check if we need to do any copying / annotation for the operatorgroup
if err := a.ensureCSVsInNamespaces(clusterServiceVersion, operatorGroup, NewNamespaceSet(operatorGroup.Status.Namespaces)); err != nil {
logger.WithError(err).Info("couldn't copy CSV to target namespaces")
syncError = err
namespaceSet := NewNamespaceSet(operatorGroup.Status.Namespaces)
if copiedCSVsAreEnabled || !namespaceSet.IsAllNamespaces() {
if err := a.ensureCSVsInNamespaces(clusterServiceVersion, operatorGroup, namespaceSet); err != nil {
logger.WithError(err).Info("couldn't copy CSV to target namespaces")
syncError = err
}

// If the CSV was installed in AllNamespace mode, remove any "CSV Copying Disabled" events
// in which the related object's name, namespace, and uid match the given CSV's.
if namespaceSet.IsAllNamespaces() {
if err := a.deleteCSVCopyingDisabledEvent(clusterServiceVersion); err != nil {
return err
}
}
return
}

requirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.Equals, []string{clusterServiceVersion.Namespace})
if err != nil {
return err
}

copiedCSVs, err := a.copiedCSVLister.List(labels.NewSelector().Add(*requirement))
if err != nil {
return err
}

for _, copiedCSV := range copiedCSVs {
err := a.client.OperatorsV1alpha1().ClusterServiceVersions(copiedCSV.Namespace).Delete(context.TODO(), copiedCSV.Name, metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
}

if err := a.createCSVCopyingDisabledEvent(clusterServiceVersion); err != nil {
return err
}

return
}

// copiedCSVsAreEnabled determines if csv copying is enabled for OLM.
//
// This method will first attempt to get the "cluster" olmConfig resource,
// if any error other than "IsNotFound" is encountered, false and the error
// will be returned.
//
// If the "cluster" olmConfig resource is found, the value of
// olmConfig.spec.features.disableCopiedCSVs will be returned along with a
// nil error.
//
// If the "cluster" olmConfig resource is not found, true will be returned
// without an error.
func (a *Operator) copiedCSVsAreEnabled() (bool, error) {
olmConfig, err := a.client.OperatorsV1().OLMConfigs().Get(context.TODO(), "cluster", metav1.GetOptions{})
if err != nil {
// Default to true if olmConfig singleton cannot be found
if k8serrors.IsNotFound(err) {
return true, nil
}
// If there was an error that wasn't an IsNotFound, return the error
return false, err
}

// If there was no error, return value based on olmConfig singleton
return olmConfig.CopiedCSVsAreEnabled(), nil
}

func (a *Operator) getCopiedCSVDisabledEventsForCSV(csv *v1alpha1.ClusterServiceVersion) ([]corev1.Event, error) {
result := []corev1.Event{}
if csv == nil {
return result, nil
}

events, err := a.opClient.KubernetesInterface().CoreV1().Events(csv.GetNamespace()).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}

for _, event := range events.Items {
if event.InvolvedObject.Namespace == csv.GetNamespace() &&
event.InvolvedObject.Name == csv.GetName() &&
event.InvolvedObject.UID == csv.GetUID() &&
event.Reason == v1.DisabledCopiedCSVsConditionType {
result = append(result, event)
}
}

return result, nil
}

func (a *Operator) deleteCSVCopyingDisabledEvent(csv *v1alpha1.ClusterServiceVersion) error {
events, err := a.getCopiedCSVDisabledEventsForCSV(csv)
if err != nil {
return err
}

// Remove existing events.
return a.deleteEvents(events)
}

func (a *Operator) deleteEvents(events []corev1.Event) error {
for _, event := range events {
err := a.opClient.KubernetesInterface().EventsV1().Events(event.GetNamespace()).Delete(context.TODO(), event.GetName(), metav1.DeleteOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return err
}
}
return nil
}

func (a *Operator) createCSVCopyingDisabledEvent(csv *v1alpha1.ClusterServiceVersion) error {
events, err := a.getCopiedCSVDisabledEventsForCSV(csv)
if err != nil {
return err
}

if len(events) == 1 {
return nil
}

// Remove existing events.
if len(events) > 1 {
if err := a.deleteEvents(events); err != nil {
return err
}
}

a.recorder.Eventf(csv, corev1.EventTypeWarning, v1.DisabledCopiedCSVsConditionType, "CSV copying disabled for %s/%s", csv.GetNamespace(), csv.GetName())

return nil
}

func (a *Operator) syncGcCsv(obj interface{}) (syncError error) {
clusterServiceVersion, ok := obj.(*v1alpha1.ClusterServiceVersion)
if !ok {
Expand Down
Loading

0 comments on commit 52f368d

Please sign in to comment.